Skip to content

Commit

Permalink
Add an API to set the creation directory API in TaskManager
Browse files Browse the repository at this point in the history
  • Loading branch information
yuandagits authored and rschlussel committed Dec 6, 2024
1 parent bf5fb82 commit adfb9c3
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 20 deletions.
58 changes: 45 additions & 13 deletions presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,38 @@ static void maybeSetupTaskSpillDirectory(
const auto includeNodeInSpillPath =
SystemConfig::instance()->includeNodeInSpillPath();
auto nodeConfig = NodeConfig::instance();
const auto taskSpillDirPath = TaskManager::buildTaskSpillDirectoryPath(
baseSpillDirectory,
nodeConfig->nodeInternalAddress(),
nodeConfig->nodeId(),
execTask.queryCtx()->queryId(),
execTask.taskId(),
includeNodeInSpillPath);
const auto [taskSpillDirPath, dateSpillDirPath] =
TaskManager::buildTaskSpillDirectoryPath(
baseSpillDirectory,
nodeConfig->nodeInternalAddress(),
nodeConfig->nodeId(),
execTask.queryCtx()->queryId(),
execTask.taskId(),
includeNodeInSpillPath);
execTask.setSpillDirectory(taskSpillDirPath, /*alreadyCreated=*/false);

execTask.setCreateSpillDirectoryCb(
[spillDir = taskSpillDirPath, dateStrDir = dateSpillDirPath]() {
auto fs = filesystems::getFileSystem(dateStrDir, nullptr);
// First create the top level directory (date string of the query) with
// TTL or other configs if set.
filesystems::DirectoryOptions options;
// Do not fail if the directory already exist because another process
// may have already created the dateStrDir.
options.failIfExists = false;
auto config = SystemConfig::instance()->spillerDirectoryCreateConfig();
if (!config.empty()) {
options.values.emplace(
filesystems::DirectoryOptions::kMakeDirectoryConfig.toString(),
config);
}
fs->mkdir(dateStrDir, options);

// After the parent directory is created,
// then create the spill directory for the actual task.
fs->mkdir(spillDir);
return spillDir;
});
}

// Keep outstanding Promises in RequestHandler's state itself.
Expand Down Expand Up @@ -379,7 +403,8 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateErrorTask(
return std::make_unique<TaskInfo>(info);
}

/*static*/ std::string TaskManager::buildTaskSpillDirectoryPath(
/*static*/ std::tuple<std::string, std::string>
TaskManager::buildTaskSpillDirectoryPath(
const std::string& baseSpillPath,
const std::string& nodeIp,
const std::string& nodeId,
Expand All @@ -397,13 +422,20 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateErrorTask(
queryId.substr(6, 2))
: "1970-01-01";

std::string path;
folly::toAppend(fmt::format("{}/presto_native/", baseSpillPath), &path);
std::string taskSpillDirPath;
folly::toAppend(
fmt::format("{}/presto_native/", baseSpillPath), &taskSpillDirPath);
if (includeNodeInSpillPath) {
folly::toAppend(fmt::format("{}_{}/", nodeIp, nodeId), &path);
folly::toAppend(fmt::format("{}_{}/", nodeIp, nodeId), &taskSpillDirPath);
}
folly::toAppend(fmt::format("{}/{}/{}/", dateString, queryId, taskId), &path);
return path;

std::string dateSpillDirPath = taskSpillDirPath;
folly::toAppend(fmt::format("{}/", dateString), &dateSpillDirPath);

folly::toAppend(
fmt::format("{}/{}/{}/", dateString, queryId, taskId), &taskSpillDirPath);
return std::make_tuple(
std::move(taskSpillDirPath), std::move(dateSpillDirPath));
}

void TaskManager::getDataForResultRequests(
Expand Down
7 changes: 4 additions & 3 deletions presto-native-execution/presto_cpp/main/TaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,10 @@ class TaskManager {
std::vector<std::string>& deadlockTasks,
std::vector<velox::exec::Task::OpCallInfo>& stuckOpCalls) const;

/// Build directory path for spilling for the given task.
/// Always returns non-empty string.
static std::string buildTaskSpillDirectoryPath(
/// Always returns tuple of non-empty string containing the spill directory
/// and the date string directory, which is parent directory of task spill
/// directory.
static std::tuple<std::string, std::string> buildTaskSpillDirectoryPath(
const std::string& baseSpillPath,
const std::string& nodeIp,
const std::string& nodeId,
Expand Down
5 changes: 5 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ SystemConfig::SystemConfig() {
NUM_PROP(kDriverNumStuckOperatorsToDetachWorker, 8),
NUM_PROP(kSpillerNumCpuThreadsHwMultiplier, 1.0),
STR_PROP(kSpillerFileCreateConfig, ""),
STR_PROP(kSpillerDirectoryCreateConfig, ""),
NONE_PROP(kSpillerSpillPath),
NUM_PROP(kShutdownOnsetSec, 10),
NUM_PROP(kSystemMemoryGb, 40),
Expand Down Expand Up @@ -409,6 +410,10 @@ std::string SystemConfig::spillerFileCreateConfig() const {
return optionalProperty<std::string>(kSpillerFileCreateConfig).value();
}

std::string SystemConfig::spillerDirectoryCreateConfig() const {
return optionalProperty<std::string>(kSpillerDirectoryCreateConfig).value();
}

folly::Optional<std::string> SystemConfig::spillerSpillPath() const {
return optionalProperty(kSpillerSpillPath);
}
Expand Down
8 changes: 8 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,12 @@ class SystemConfig : public ConfigBase {
static constexpr std::string_view kSpillerFileCreateConfig{
"spiller.file-create-config"};

/// Config used to create spill directories. This config is provided to
/// underlying file system and the config is free form. The form should be
/// defined by the underlying file system.
static constexpr std::string_view kSpillerDirectoryCreateConfig{
"spiller.directory-create-config"};

static constexpr std::string_view kSpillerSpillPath{
"experimental.spiller-spill-path"};
static constexpr std::string_view kShutdownOnsetSec{"shutdown-onset-sec"};
Expand Down Expand Up @@ -734,6 +740,8 @@ class SystemConfig : public ConfigBase {

std::string spillerFileCreateConfig() const;

std::string spillerDirectoryCreateConfig() const;

folly::Optional<std::string> spillerSpillPath() const;

int32_t shutdownOnsetSec() const;
Expand Down
17 changes: 13 additions & 4 deletions presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1262,24 +1262,33 @@ TEST_P(TaskManagerTest, aggregationSpill) {

TEST_P(TaskManagerTest, buildTaskSpillDirectoryPath) {
EXPECT_EQ(
"fs::/base/presto_native/192.168.10.2_19/2022-12-20/20221220-Q/Task1/",
std::make_tuple(
"fs::/base/presto_native/192.168.10.2_19/2022-12-20/20221220-Q/Task1/",
"fs::/base/presto_native/192.168.10.2_19/2022-12-20/"),
TaskManager::buildTaskSpillDirectoryPath(
"fs::/base", "192.168.10.2", "19", "20221220-Q", "Task1", true));
EXPECT_EQ(
"fsx::/root/presto_native/192.16.10.2_sample_node_id/1970-01-01/Q100/Task22/",
std::make_tuple(
"fsx::/root/presto_native/192.16.10.2_sample_node_id/1970-01-01/Q100/Task22/",
"fsx::/root/presto_native/192.16.10.2_sample_node_id/1970-01-01/"),
TaskManager::buildTaskSpillDirectoryPath(
"fsx::/root",
"192.16.10.2",
"sample_node_id",
"Q100",
"Task22",
true));

EXPECT_EQ(
"fs::/base/presto_native/2022-12-20/20221220-Q/Task1/",
std::make_tuple(
"fs::/base/presto_native/2022-12-20/20221220-Q/Task1/",
"fs::/base/presto_native/2022-12-20/"),
TaskManager::buildTaskSpillDirectoryPath(
"fs::/base", "192.168.10.2", "19", "20221220-Q", "Task1", false));
EXPECT_EQ(
"fsx::/root/presto_native/1970-01-01/Q100/Task22/",
std::make_tuple(
"fsx::/root/presto_native/1970-01-01/Q100/Task22/",
"fsx::/root/presto_native/1970-01-01/"),
TaskManager::buildTaskSpillDirectoryPath(
"fsx::/root",
"192.16.10.2",
Expand Down

0 comments on commit adfb9c3

Please sign in to comment.