Skip to content

Commit

Permalink
add confirm step befor submit job
Browse files Browse the repository at this point in the history
  • Loading branch information
Tianhao-Gu committed Apr 5, 2024
1 parent 2f2d45d commit 5016106
Showing 1 changed file with 33 additions and 10 deletions.
43 changes: 33 additions & 10 deletions src/loaders/jobs/taskfarmer/task_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import os
import shutil

import click

import src.loaders.jobs.taskfarmer.taskfarmer_common as tf_common
from src.loaders.common import loader_common_names
from src.loaders.common.loader_helper import make_collection_source_dir
Expand Down Expand Up @@ -45,7 +47,7 @@
# Used as THREADS variable in the batch script which controls the number of parallel tasks per node
TASKS_PER_NODE_DEFAULT = 1

THREADS_PER_TOOL_RUN_DEFAULT = 32
SYSTEM_CPU_CORES = 256 # number of CPU cores available on the NERSC nodes

# Task metadata - tool specific parameters for task generation and execution
# chunk_size is the quantity of genomes processed within each task (default is 5000)
Expand All @@ -58,7 +60,7 @@
# tasks_per_node is the number of parallel tasks to run on a node (default is 1)
# node_time_limit is the time limit for the node we reserved for the task (default is 5 hours)
# if no specific metadata is provided for a tool, the default values are used.
TASK_META = {'gtdb_tk': {'chunk_size': 1000, 'exe_time': 65, 'tasks_per_node': 4},
TASK_META = {'gtdb_tk': {'chunk_size': 1000, 'exe_time': 65, 'tasks_per_node': 4, 'threads_per_tool_run': 32},
'eggnog': {'chunk_size': 100, 'exe_time': 15, 'node_time_limit': 0.5}, # Memory intensive tool - reserve more nodes with less node reservation time
'default': {'chunk_size': 5000, 'exe_time': 60}}
MAX_NODE_NUM = 100 # maximum number of nodes to use
Expand Down Expand Up @@ -189,6 +191,21 @@ def _create_genome_id_file(genome_ids, genome_id_file):
_write_to_file(genome_id_file, content)


def _get_threads_per_tool_run(tool: str) -> int:
"""
Get the number of threads to use for each tool execution
"""

# If the tool has a specific number of threads per tool run, use that
threads_per_tool_run = TASK_META.get(tool, TASK_META['default']).get('threads_per_tool_run')
if threads_per_tool_run:
return threads_per_tool_run

Check warning on line 202 in src/loaders/jobs/taskfarmer/task_generator.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/jobs/taskfarmer/task_generator.py#L200-L202

Added lines #L200 - L202 were not covered by tests

threads_per_tool_run = SYSTEM_CPU_CORES // _get_node_task_count(tool)

Check warning on line 204 in src/loaders/jobs/taskfarmer/task_generator.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/jobs/taskfarmer/task_generator.py#L204

Added line #L204 was not covered by tests

return threads_per_tool_run

Check warning on line 206 in src/loaders/jobs/taskfarmer/task_generator.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/jobs/taskfarmer/task_generator.py#L206

Added line #L206 was not covered by tests


def _create_task_list(
env: str,
kbase_collection: str,
Expand All @@ -207,7 +224,7 @@ def _create_task_list(
os.path.isdir(os.path.join(source_data_dir, path))]

chunk_size = TASK_META.get(tool, TASK_META['default'])['chunk_size']
threads_per_tool_run = TASK_META.get(tool, TASK_META['default']).get('threads_per_tool_run', THREADS_PER_TOOL_RUN_DEFAULT)
threads_per_tool_run = _get_threads_per_tool_run(tool)

Check warning on line 227 in src/loaders/jobs/taskfarmer/task_generator.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/jobs/taskfarmer/task_generator.py#L227

Added line #L227 was not covered by tests
genome_ids_chunks = [genome_ids[i: i + chunk_size] for i in range(0, len(genome_ids), chunk_size)]

vol_mounts = _retrieve_tool_volume(tool, root_dir)
Expand Down Expand Up @@ -315,7 +332,7 @@ def _create_batch_script(job_dir, task_list_file, n_jobs, tool):
batch_script_file = os.path.join(job_dir, tf_common.BATCH_SCRIPT)
_write_to_file(batch_script_file, batch_script)

return batch_script_file
return batch_script_file, node_num

Check warning on line 335 in src/loaders/jobs/taskfarmer/task_generator.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/jobs/taskfarmer/task_generator.py#L335

Added line #L335 was not covered by tests


def _create_job_dir(job_dir, destroy_old_job_dir=False):
Expand Down Expand Up @@ -410,14 +427,20 @@ def main():
root_dir,
source_file_ext=source_file_ext)

batch_script = _create_batch_script(job_dir, task_list_file, n_jobs, tool)
batch_script, node_num = _create_batch_script(job_dir, task_list_file, n_jobs, tool)

Check warning on line 430 in src/loaders/jobs/taskfarmer/task_generator.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/jobs/taskfarmer/task_generator.py#L430

Added line #L430 was not covered by tests

if args.submit_job:
try:
task_mgr.submit_job()
except PreconditionError as e:
raise ValueError(f'Error submitting job:\n{e}\n'
f'Please use the --force flag to overwrite the previous run.') from e
confirmation = click.confirm(f"The task will reserve {node_num} nodes, "

Check warning on line 433 in src/loaders/jobs/taskfarmer/task_generator.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/jobs/taskfarmer/task_generator.py#L433

Added line #L433 was not covered by tests
f"with a maximum total runtime of {node_num * _get_node_time_limit(tool)} hours.\n"
f"Please confirm to proceed.")
if confirmation:
try:
task_mgr.submit_job()
except PreconditionError as e:
raise ValueError(f'Error submitting job:\n{e}\n'

Check warning on line 440 in src/loaders/jobs/taskfarmer/task_generator.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/jobs/taskfarmer/task_generator.py#L436-L440

Added lines #L436 - L440 were not covered by tests
f'Please use the --force flag to overwrite the previous run.') from e
else:
print('Job submission cancelled.')

Check warning on line 443 in src/loaders/jobs/taskfarmer/task_generator.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/jobs/taskfarmer/task_generator.py#L443

Added line #L443 was not covered by tests
else:
print(f'Please go to Job Directory: {job_dir} and submit the batch script: {batch_script} to the scheduler.')

Expand Down

0 comments on commit 5016106

Please sign in to comment.