Skip to content

Commit

Permalink
[native] Respect summarize flag in TaskResource
Browse files Browse the repository at this point in the history
When summarize flag is set do not send detailed per-pipeline stats (same
as in Java implementation) to save CPU on serialization /
deserialization

Co-authored-by: Andrii Rosa <andriirosa@fb.com>
  • Loading branch information
2 people authored and amitkdutta committed Jan 15, 2025
1 parent 85eeacf commit 1b57e1c
Show file tree
Hide file tree
Showing 9 changed files with 386 additions and 240 deletions.
416 changes: 229 additions & 187 deletions presto-native-execution/presto_cpp/main/PrestoTask.cpp

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions presto-native-execution/presto_cpp/main/PrestoTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ struct PrestoTask {
return updateStatusLocked();
}

protocol::TaskInfo updateInfo() {
protocol::TaskInfo updateInfo(bool summarize) {
std::lock_guard<std::mutex> l(mutex);
return updateInfoLocked();
return updateInfoLocked(summarize);
}

/// Turns the task numbers (per state) into a string.
Expand All @@ -172,7 +172,7 @@ struct PrestoTask {

/// Invoked to update presto task status from the updated velox task stats.
protocol::TaskStatus updateStatusLocked();
protocol::TaskInfo updateInfoLocked();
protocol::TaskInfo updateInfoLocked(bool summarize);

folly::dynamic toJson() const;

Expand All @@ -191,7 +191,8 @@ struct PrestoTask {
void updateExecutionInfoLocked(
const velox::exec::TaskStats& veloxTaskStats,
const protocol::TaskStatus& prestoTaskStatus,
std::unordered_map<std::string, velox::RuntimeMetric>& taskRuntimeStats);
std::unordered_map<std::string, velox::RuntimeMetric>& taskRuntimeStats,
bool includePipelineStats);

void updateMemoryInfoLocked(
const velox::exec::TaskStats& veloxTaskStats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ RowVectorPtr SystemDataSource::getTaskResults() {
std::vector<protocol::TaskInfo> taskInfos;
taskInfos.reserve(numRows);
for (const auto& taskEntry : taskMap) {
taskInfos.push_back(taskEntry.second->updateInfo());
taskInfos.push_back(taskEntry.second->updateInfo(true));
}

auto result = std::dynamic_pointer_cast<RowVector>(
Expand Down
48 changes: 29 additions & 19 deletions presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ void TaskManager::acknowledgeResults(
std::unique_ptr<TaskInfo> TaskManager::createOrUpdateErrorTask(
const TaskId& taskId,
const std::exception_ptr& exception,
bool summarize,
long startProcessCpuTime) {
auto prestoTask = findOrCreateTask(taskId, startProcessCpuTime);
{
Expand All @@ -399,7 +400,7 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateErrorTask(
prestoTask->info.needsPlan = false;
}

auto info = prestoTask->updateInfo();
auto info = prestoTask->updateInfo(summarize);
return std::make_unique<TaskInfo>(info);
}

Expand Down Expand Up @@ -462,13 +463,15 @@ std::unique_ptr<protocol::TaskInfo> TaskManager::createOrUpdateTask(
const protocol::TaskId& taskId,
const protocol::TaskUpdateRequest& updateRequest,
const velox::core::PlanFragment& planFragment,
bool summarize,
std::shared_ptr<velox::core::QueryCtx> queryCtx,
long startProcessCpuTime) {
return createOrUpdateTaskImpl(
taskId,
planFragment,
updateRequest.sources,
updateRequest.outputIds,
summarize,
std::move(queryCtx),
startProcessCpuTime);
}
Expand All @@ -477,6 +480,7 @@ std::unique_ptr<protocol::TaskInfo> TaskManager::createOrUpdateBatchTask(
const protocol::TaskId& taskId,
const protocol::BatchTaskUpdateRequest& batchUpdateRequest,
const velox::core::PlanFragment& planFragment,
bool summarize,
std::shared_ptr<velox::core::QueryCtx> queryCtx,
long startProcessCpuTime) {
auto updateRequest = batchUpdateRequest.taskUpdateRequest;
Expand All @@ -488,6 +492,7 @@ std::unique_ptr<protocol::TaskInfo> TaskManager::createOrUpdateBatchTask(
planFragment,
updateRequest.sources,
updateRequest.outputIds,
summarize,
std::move(queryCtx),
startProcessCpuTime);
}
Expand All @@ -497,6 +502,7 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTaskImpl(
const velox::core::PlanFragment& planFragment,
const std::vector<protocol::TaskSource>& sources,
const protocol::OutputBuffers& outputBuffers,
bool summarize,
std::shared_ptr<velox::core::QueryCtx> queryCtx,
long startProcessCpuTime) {
std::shared_ptr<exec::Task> execTask;
Expand All @@ -509,7 +515,8 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTaskImpl(
// If the task is aborted, no need to do anything else.
// This takes care of DELETE task message coming before CREATE task.
if (prestoTask->info.taskStatus.state == protocol::TaskState::ABORTED) {
return std::make_unique<TaskInfo>(prestoTask->updateInfoLocked());
return std::make_unique<TaskInfo>(
prestoTask->updateInfoLocked(summarize));
}

// Uses a temp variable to store the created velox task to destroy it
Expand Down Expand Up @@ -627,7 +634,8 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTaskImpl(

// 'prestoTask' will exist by virtue of shared_ptr but may for example have
// been aborted.
auto info = prestoTask->updateInfoLocked(); // Presto task is locked above.
auto info =
prestoTask->updateInfoLocked(summarize); // Presto task is locked above.
if (auto promiseHolder = infoRequest.lock()) {
promiseHolder->promise.setValue(std::make_unique<protocol::TaskInfo>(info));
}
Expand All @@ -638,9 +646,8 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTaskImpl(
return std::make_unique<TaskInfo>(info);
}

std::unique_ptr<TaskInfo> TaskManager::deleteTask(
const TaskId& taskId,
bool /*abort*/) {
std::unique_ptr<TaskInfo>
TaskManager::deleteTask(const TaskId& taskId, bool /*abort*/, bool summarize) {
LOG(INFO) << "Deleting task " << taskId;
// Fast. non-blocking delete and cancel serialized on 'taskMap'.
std::shared_ptr<facebook::presto::PrestoTask> prestoTask;
Expand Down Expand Up @@ -668,7 +675,7 @@ std::unique_ptr<TaskInfo> TaskManager::deleteTask(
}
prestoTask->info.stats.endTime =
util::toISOTimestamp(velox::getCurrentTimeMs());
prestoTask->updateInfoLocked();
prestoTask->updateInfoLocked(summarize);
} else {
// If task is not found than we observe DELETE message coming before
// CREATE. In that case we create the task with ABORTED state, so we know
Expand Down Expand Up @@ -809,8 +816,8 @@ folly::Future<std::unique_ptr<protocol::TaskInfo>> TaskManager::getTaskInfo(
auto prestoTask = findOrCreateTask(taskId);
if (!currentState || !maxWait) {
// Return current TaskInfo without waiting.
promise.setValue(
std::make_unique<protocol::TaskInfo>(prestoTask->updateInfo()));
promise.setValue(std::make_unique<protocol::TaskInfo>(
prestoTask->updateInfo(summarize)));
prestoTask->updateCoordinatorHeartbeat();
return std::move(future).via(httpSrvCpuExecutor_);
}
Expand All @@ -831,12 +838,14 @@ folly::Future<std::unique_ptr<protocol::TaskInfo>> TaskManager::getTaskInfo(

return std::move(future)
.via(httpSrvCpuExecutor_)
.onTimeout(std::chrono::microseconds(maxWaitMicros), [prestoTask]() {
return std::make_unique<protocol::TaskInfo>(
prestoTask->updateInfo());
});
.onTimeout(
std::chrono::microseconds(maxWaitMicros),
[prestoTask, summarize]() {
return std::make_unique<protocol::TaskInfo>(
prestoTask->updateInfo(summarize));
});
}
info = prestoTask->updateInfoLocked();
info = prestoTask->updateInfoLocked(summarize);
}
if (currentState.value() != info.taskStatus.state ||
isFinalState(info.taskStatus.state)) {
Expand All @@ -850,16 +859,17 @@ folly::Future<std::unique_ptr<protocol::TaskInfo>> TaskManager::getTaskInfo(

prestoTask->task->stateChangeFuture(maxWaitMicros)
.via(httpSrvCpuExecutor_)
.thenValue([promiseHolder, prestoTask](auto&& /*done*/) {
promiseHolder->promise.setValue(
std::make_unique<protocol::TaskInfo>(prestoTask->updateInfo()));
.thenValue([promiseHolder, prestoTask, summarize](auto&& /*done*/) {
promiseHolder->promise.setValue(std::make_unique<protocol::TaskInfo>(
prestoTask->updateInfo(summarize)));
})
.thenError(
folly::tag_t<std::exception>{},
[promiseHolder, prestoTask](const std::exception& /*e*/) {
[promiseHolder, prestoTask, summarize](const std::exception& /*e*/) {
// We come here in the case of maxWait elapsed.
promiseHolder->promise.setValue(
std::make_unique<protocol::TaskInfo>(prestoTask->updateInfo()));
std::make_unique<protocol::TaskInfo>(
prestoTask->updateInfo(summarize)));
});
return std::move(future).via(httpSrvCpuExecutor_);
}
Expand Down
9 changes: 6 additions & 3 deletions presto-native-execution/presto_cpp/main/TaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,22 @@ class TaskManager {
std::unique_ptr<protocol::TaskInfo> createOrUpdateErrorTask(
const protocol::TaskId& taskId,
const std::exception_ptr& exception,
bool summarize,
long startProcessCpuTime);

std::unique_ptr<protocol::TaskInfo> createOrUpdateTask(
const protocol::TaskId& taskId,
const protocol::TaskUpdateRequest& updateRequest,
const velox::core::PlanFragment& planFragment,
bool summarize,
std::shared_ptr<velox::core::QueryCtx> queryCtx,
long startProcessCpuTime);

std::unique_ptr<protocol::TaskInfo> createOrUpdateBatchTask(
const protocol::TaskId& taskId,
const protocol::BatchTaskUpdateRequest& batchUpdateRequest,
const velox::core::PlanFragment& planFragment,
bool summarize,
std::shared_ptr<velox::core::QueryCtx> queryCtx,
long startProcessCpuTime);

Expand All @@ -84,9 +87,8 @@ class TaskManager {
const std::unordered_map<int64_t, std::shared_ptr<ResultRequest>>&
resultRequests);

std::unique_ptr<protocol::TaskInfo> deleteTask(
const protocol::TaskId& taskId,
bool abort);
std::unique_ptr<protocol::TaskInfo>
deleteTask(const protocol::TaskId& taskId, bool abort, bool summarize);

/// Remove old Finished, Cancelled, Failed and Aborted tasks.
/// Old is being defined by the lifetime of the task.
Expand Down Expand Up @@ -180,6 +182,7 @@ class TaskManager {
const velox::core::PlanFragment& planFragment,
const std::vector<protocol::TaskSource>& sources,
const protocol::OutputBuffers& outputBuffers,
bool summarize,
std::shared_ptr<velox::core::QueryCtx> queryCtx,
long startProcessCpuTime);

Expand Down
26 changes: 18 additions & 8 deletions presto-native-execution/presto_cpp/main/TaskResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,36 +204,41 @@ proxygen::RequestHandler* TaskResource::acknowledgeResults(
}

proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
proxygen::HTTPMessage* /*message*/,
proxygen::HTTPMessage* message,
const std::vector<std::string>& pathMatch,
const std::function<std::unique_ptr<protocol::TaskInfo>(
const protocol::TaskId& taskId,
const std::string& updateJson,
const bool summarize,
long startProcessCpuTime)>& createOrUpdateFunc) {
protocol::TaskId taskId = pathMatch[1];
bool summarize = message->hasQueryParam("summarize");
return new http::CallbackRequestHandler(
[this, taskId, createOrUpdateFunc](
[this, taskId, summarize, createOrUpdateFunc](
proxygen::HTTPMessage* /*message*/,
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
proxygen::ResponseHandler* downstream,
std::shared_ptr<http::CallbackRequestHandlerState> handlerState) {
folly::via(
httpSrvCpuExecutor_,
[this, &body, taskId, createOrUpdateFunc]() {
[this, &body, taskId, summarize, createOrUpdateFunc]() {
const auto startProcessCpuTimeNs = util::getProcessCpuTimeNs();
std::string updateJson = util::extractMessageBody(body);

std::unique_ptr<protocol::TaskInfo> taskInfo;
try {
taskInfo = createOrUpdateFunc(
taskId, updateJson, startProcessCpuTimeNs);
taskId, updateJson, summarize, startProcessCpuTimeNs);
} catch (const velox::VeloxException& e) {
// Creating an empty task, putting errors inside so that next
// status fetch from coordinator will catch the error and well
// categorize it.
try {
taskInfo = taskManager_.createOrUpdateErrorTask(
taskId, std::current_exception(), startProcessCpuTimeNs);
taskId,
std::current_exception(),
summarize,
startProcessCpuTimeNs);
} catch (const velox::VeloxUserError& e) {
throw;
}
Expand Down Expand Up @@ -271,6 +276,7 @@ proxygen::RequestHandler* TaskResource::createOrUpdateBatchTask(
pathMatch,
[&](const protocol::TaskId& taskId,
const std::string& updateJson,
const bool summarize,
long startProcessCpuTime) {
protocol::BatchTaskUpdateRequest batchUpdateRequest =
json::parse(updateJson);
Expand Down Expand Up @@ -308,6 +314,7 @@ proxygen::RequestHandler* TaskResource::createOrUpdateBatchTask(
taskId,
batchUpdateRequest,
planFragment,
summarize,
std::move(queryCtx),
startProcessCpuTime);
});
Expand All @@ -321,6 +328,7 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTask(
pathMatch,
[&](const protocol::TaskId& taskId,
const std::string& updateJson,
const bool summarize,
long startProcessCpuTime) {
protocol::TaskUpdateRequest updateRequest = json::parse(updateJson);
velox::core::PlanFragment planFragment;
Expand All @@ -344,6 +352,7 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTask(
taskId,
updateRequest,
planFragment,
summarize,
std::move(queryCtx),
startProcessCpuTime);
});
Expand All @@ -358,18 +367,19 @@ proxygen::RequestHandler* TaskResource::deleteTask(
abort =
message->getQueryParam(protocol::PRESTO_ABORT_TASK_URL_PARAM) == "true";
}
bool summarize = message->hasQueryParam("summarize");

return new http::CallbackRequestHandler(
[this, taskId, abort](
[this, taskId, abort, summarize](
proxygen::HTTPMessage* /*message*/,
const std::vector<std::unique_ptr<folly::IOBuf>>& /*body*/,
proxygen::ResponseHandler* downstream,
std::shared_ptr<http::CallbackRequestHandlerState> handlerState) {
folly::via(
httpSrvCpuExecutor_,
[this, taskId, abort, downstream]() {
[this, taskId, abort, downstream, summarize]() {
std::unique_ptr<protocol::TaskInfo> taskInfo;
taskInfo = taskManager_.deleteTask(taskId, abort);
taskInfo = taskManager_.deleteTask(taskId, abort, summarize);
return std::move(taskInfo);
})
.via(folly::EventBaseManager::get()->getEventBase())
Expand Down
1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/TaskResource.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class TaskResource {
const std::function<std::unique_ptr<protocol::TaskInfo>(
const protocol::TaskId&,
const std::string&,
const bool,
long)>& createOrUpdateFunc);

proxygen::RequestHandler* deleteTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ TEST_F(ServerOperationTest, taskEndpoint) {
taskId,
{},
planFragment,
true,
taskManager->getQueryContextManager()->findOrCreateQueryCtx(
taskId, updateRequest.session),
0);
Expand Down Expand Up @@ -215,7 +216,7 @@ TEST_F(ServerOperationTest, taskEndpoint) {

// Cleanup and shutdown
for (const auto& taskId : taskIds) {
taskManager->deleteTask(taskId, true);
taskManager->deleteTask(taskId, true, true);
}
taskManager->shutdown();
connector::unregisterConnector("test-hive");
Expand Down
Loading

0 comments on commit 1b57e1c

Please sign in to comment.