Skip to content

Commit

Permalink
[native] Update code that sets up the task spill directory.
Browse files Browse the repository at this point in the history
  • Loading branch information
spershin committed Dec 21, 2022
1 parent 017d65c commit db015ec
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 67 deletions.
104 changes: 38 additions & 66 deletions presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,26 @@ namespace facebook::presto {

namespace {

// If spilling is enabled and the given Task can spill, then this helper
// generates the spilling directory path for the Task, creates that directory in
// the file system and sets the path to it to the Task.
static void maybeSetupTaskSpillDirectory(
const core::PlanFragment& planFragment,
exec::Task& execTask) {
const auto baseSpillPath = SystemConfig::instance()->spillerSpillPath();
if (!baseSpillPath.empty() &&
planFragment.canSpill(execTask.queryCtx()->queryConfig())) {
const auto taskSpillDirPath = TaskManager::buildTaskSpillDirectoryPath(
baseSpillPath, execTask.queryCtx()->queryId(), execTask.taskId());
execTask.setSpillDirectory(taskSpillDirPath);
// Create folder for the task spilling.
auto fileSystem =
velox::filesystems::getFileSystem(taskSpillDirPath, nullptr);
VELOX_CHECK_NOT_NULL(fileSystem, "File System is null!");
fileSystem->mkdir(taskSpillDirPath);
}
}

bool isFinalState(protocol::TaskState state) {
switch (state) {
case protocol::TaskState::FINISHED:
Expand Down Expand Up @@ -174,75 +194,29 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateErrorTask(
return std::make_unique<TaskInfo>(info);
}

// If the plan fragment contains operator that can spill and spilling is
// enabled, created the spilling directory for the task.
static std::string maybeSetupTaskSpillDirectory(
const velox::core::QueryCtx* queryCtx,
const TaskId& taskId,
const velox::core::PlanFragment& planFragment) {
if (not queryCtx->queryConfig().spillEnabled()) {
return "";
}

const auto baseSpillPath = SystemConfig::instance()->spillerSpillPath();
if (baseSpillPath.empty()) {
return "";
}

// Functors to check if a given node can spill.
const auto hasSpillingHashAggrFunc = [](const core::PlanNode* node) {
if (node->name() == "Aggregation") {
if (const auto* aggregationNode =
dynamic_cast<const core::AggregationNode*>(node)) {
if (aggregationNode->isFinal() || aggregationNode->isSingle()) {
return true;
}
}
}
return false;
};
const auto hasSpillingOrderByFunc = [](const core::PlanNode* node) {
return (node->name() == "OrderBy");
};
const auto hasSpillingHashJoinFunc = [](const core::PlanNode* node) {
return (node->name() == "HashJoin");
};
/*static*/ std::string TaskManager::buildTaskSpillDirectoryPath(
const std::string& baseSpillPath,
const std::string& queryId,
const protocol::TaskId& taskId) {
// Generate 'YYYY-MM-DD' from the query ID, which starts with 'YYYYMMDD'.
// In case query id is malformed (should not be the case in production) we
// fall back to the predefined date.
const std::string dateString = (queryId.size() >= 8)
? fmt::format(
"{}-{}-{}",
queryId.substr(0, 4),
queryId.substr(4, 2),
queryId.substr(6, 2))
: "1970-01-01";

// Determine if this task can spill at all, to avoid creating a folder for
// nothing. Looks for particular nodes for that.
const auto* topNode = planFragment.planNode.get();
if ((queryCtx->queryConfig().aggregationSpillEnabled() &&
core::PlanNode::findFirstNode(topNode, hasSpillingHashAggrFunc)) ||
(queryCtx->queryConfig().orderBySpillEnabled() &&
core::PlanNode::findFirstNode(topNode, hasSpillingOrderByFunc)) ||
(queryCtx->queryConfig().joinSpillEnabled() &&
core::PlanNode::findFirstNode(topNode, hasSpillingHashJoinFunc))) {
// Might need spilling.
} else {
return "";
}

// Form the task spill path.
const std::string& queryId = queryCtx->queryId();
std::stringstream ss;
ss << baseSpillPath << "/";
// Generate 'YYYY-MM-DD' from the query ID, which starts with 'YYYYMMDD'.
ss << fmt::format(
"{}-{}-{}",
queryId.substr(0, 4),
queryId.substr(4, 2),
queryId.substr(6, 2));
ss << dateString;
ss << "/";
// TODO(spershin): We will like need to use identity (from config?) in the
// long run. Use 'presto_native' for now.
ss << "presto_native" << queryId << "/" << taskId << "/";
const std::string spillPath = ss.str();

// Create folder for the task spilling.
auto fileSystem = velox::filesystems::getFileSystem(spillPath, nullptr);
VELOX_CHECK_NOT_NULL(fileSystem, "File System is null!");
fileSystem->mkdir(spillPath);
return spillPath;
ss << "presto_native/" << queryId << "/" << taskId << "/";
return ss.str();
}

std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTask(
Expand Down Expand Up @@ -280,11 +254,9 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTask(
concurrentLifespans = std::numeric_limits<uint32_t>::max();
}

const auto taskSpillDir =
maybeSetupTaskSpillDirectory(queryCtx.get(), taskId, planFragment);
execTask = std::make_shared<exec::Task>(
taskId, planFragment, 0, std::move(queryCtx));
execTask->setSpillDirectory(taskSpillDir);
maybeSetupTaskSpillDirectory(planFragment, *execTask);

prestoTask->task = execTask;
prestoTask->info.needsPlan = false;
Expand Down
7 changes: 7 additions & 0 deletions presto-native-execution/presto_cpp/main/TaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ class TaskManager {
// in exec/Task.h).
std::array<size_t, 5> getTaskNumbers(size_t& numTasks) const;

/// Build directory path for spilling for the given task.
/// Always returns non-empty string.
static std::string buildTaskSpillDirectoryPath(
const std::string& baseSpillPath,
const std::string& queryId,
const protocol::TaskId& taskId);

public:
static constexpr folly::StringPiece kMaxDriversPerTask{
"max_drivers_per_task"};
Expand Down
10 changes: 10 additions & 0 deletions presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -839,4 +839,14 @@ TEST_F(TaskManagerTest, aggregationSpill) {
}
}

TEST_F(TaskManagerTest, buildTaskSpillDirectoryPath) {
EXPECT_EQ(
"fs::/base/2022-12-20/presto_native/20221220-Q/Task1/",
TaskManager::buildTaskSpillDirectoryPath(
"fs::/base", "20221220-Q", "Task1"));
EXPECT_EQ(
"fsx::/root/1970-01-01/presto_native/Q100/Task22/",
TaskManager::buildTaskSpillDirectoryPath("fsx::/root", "Q100", "Task22"));
}

// TODO: add disk spilling test for order by and hash join later.

0 comments on commit db015ec

Please sign in to comment.