Skip to content

Commit

Permalink
Merge pull request #711 from kbase/dev_taskfarmer_gen
Browse files Browse the repository at this point in the history
RE2022-336: update tool containers to use threads_per_tool_run
  • Loading branch information
Tianhao-Gu authored Apr 5, 2024
2 parents a73abd9 + c4ecf35 commit 2f2d45d
Show file tree
Hide file tree
Showing 15 changed files with 77 additions and 36 deletions.
4 changes: 2 additions & 2 deletions src/loaders/common/loader_common_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
ENV_ARG_NAME = "env"

# Prefix for output directories generated by the compute tools.
COMPUTE_OUTPUT_PREFIX = "batch_"
COMPUTE_OUTPUT_NO_BATCH = "no_batch"
COMPUTE_OUTPUT_PREFIX = "job"
COMPUTE_OUTPUT_NO_BATCH = "_non_batch"

"""
File structure at NERSC for loader programs
Expand Down
4 changes: 2 additions & 2 deletions src/loaders/compute_tools/checkm2/checkm2.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
def _run_checkm2(
ids_to_files: Dict[str, GenomeTuple],
output_dir: Path,
threads: int,
threads_per_tool_run: int,
debug: bool,
):
size = len(ids_to_files)
Expand All @@ -35,7 +35,7 @@ def _run_checkm2(
# RUN checkM2 predict
command = ['checkm2', 'predict',
'--output-directory', str(output_dir),
'--threads', str(threads),
'--threads', str(threads_per_tool_run),
'--force', # will overwrite output directory contents
'--input'] + [str(v.source_file) for v in ids_to_files.values()]

Expand Down
7 changes: 7 additions & 0 deletions src/loaders/compute_tools/checkm2/versions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,11 @@ versions:
date: 2023-11-08
notes: |
- create fatal error file for CheckM2
reference_db_version: 1.0.1
- version: 0.1.4
date: 2024-04-05
notes: |
- Remove the 'threads' and 'program_threads' parameters
and introduce the capability to specify the number of threads per tool run
- pass job_id to the tool container and remove node_id
reference_db_version: 1.0.1
4 changes: 2 additions & 2 deletions src/loaders/compute_tools/eggnog/eggnog.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def _run_eggnog_single(
data_id: str,
source_file: Path,
output_dir: Path,
program_threads: int,
threads_per_tool_run: int,
debug: bool) -> None:

metadata_file = output_dir / EGGNOG_METADATA
Expand All @@ -33,7 +33,7 @@ def _run_eggnog_single(
'-o', output_dir / source_file.name, # Output prefix.
# Save result file to collectiondata directory. Expecting 'emapper.annotations', 'emapper.hits' and 'emapper.seed_orthologs' files.
'--itype', f'{INPUT_TYPE}',
'--cpu', f'{program_threads}',
'--cpu', f'{threads_per_tool_run}',
'--excel',
'--sensmode', 'fast',
'--dmnd_iterate', 'no',
Expand Down
8 changes: 8 additions & 0 deletions src/loaders/compute_tools/eggnog/versions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,12 @@ versions:
date: 2024-03-15
notes: |
- add ability to specify thread number for execution
reference_db_version: 5.0.2

- version: 0.1.2
date: 2024-04-05
notes: |
- Remove the 'threads' and 'program_threads' parameters
and introduce the capability to specify the number of threads per tool run
- pass job_id to the tool container and remove node_id
reference_db_version: 5.0.2
7 changes: 3 additions & 4 deletions src/loaders/compute_tools/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ $env_tool run -n $CONDA_ENV \
--env $ENV \
--kbase_collection $KBASE_COLLECTION \
--root_dir $ROOT_DIR \
--threads $THREADS \
--program_threads $PROGRAM_THREADS \
--node_id $NODE_ID \
--threads_per_tool_run $THREADS_PER_TOOL_RUN \
--job_id $JOB_ID \
--debug \
--source_file_ext $SOURCE_FILE_EXT \
--data_id_file $GENOME_ID_FILE # TODO DATA_ID rename to $DATA_ID_FILE for generality
--data_id_file $DATA_ID_FILE
4 changes: 2 additions & 2 deletions src/loaders/compute_tools/gtdb_tk/gtdb_tk.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def _get_id_and_error_message_mapping(file_path: str):
def _run_gtdb_tk(
ids_to_files: Dict[str, GenomeTuple],
output_dir: Path,
threads: int,
threads_per_tool_run: int,
debug: bool,
):
size = len(ids_to_files)
Expand All @@ -89,7 +89,7 @@ def _run_gtdb_tk(
'--batchfile', str(batch_file_path),
'--out_dir', str(output_dir),
'--force',
'--cpus', str(threads),
'--cpus', str(threads_per_tool_run),
'--skip_ani_screen',
]
command.append('--debug') if debug else None
Expand Down
7 changes: 7 additions & 0 deletions src/loaders/compute_tools/gtdb_tk/versions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,11 @@ versions:
date: 2023-11-06
notes: |
- using --skip_ani_screen option to skip pre ANI screening step to improve performance
reference_db_version: release214
- version: 0.1.5
date: 2024-04-05
notes: |
- Remove the 'threads' and 'program_threads' parameters
and introduce the capability to specify the number of threads per tool run
- pass job_id to the tool container and remove node_id
reference_db_version: release214
4 changes: 2 additions & 2 deletions src/loaders/compute_tools/mash/mash.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def _run_mash_single(
data_id: str,
source_file: Path,
output_dir: Path,
program_threads: int,
threads_per_tool_run: int,
debug: bool,
kmer_size: int = KMER_SIZE,
sketch_size: int = SKETCH_SIZE) -> None:
Expand All @@ -26,7 +26,7 @@ def _run_mash_single(
# Save result file to source file directory. The suffix '.msh' will be appended.
'-k', f'{kmer_size}',
'-s', f'{sketch_size}',
'-p', f'{program_threads}',
'-p', f'{threads_per_tool_run}',
source_file]

run_command(command, output_dir if debug else None)
Expand Down
8 changes: 7 additions & 1 deletion src/loaders/compute_tools/mash/versions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,10 @@ versions:
- version: 0.1.2
date: 2024-03-15
notes: |
- add ability to specify thread number for execution
- add ability to specify thread number for execution
- version: 0.1.3
date: 2024-04-05
notes: |
- Remove the 'threads' and 'program_threads' parameters
and introduce the capability to specify the number of threads per tool run
- pass job_id to the tool container and remove node_id
4 changes: 2 additions & 2 deletions src/loaders/compute_tools/microtrait/microtrait.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def _run_microtrait(
data_id: str,
fna_file: Path,
genome_dir: Path,
program_threads: int,
threads_per_tool_run: int,
debug: bool):
# run microtrait.extract_traits on the genome file
# https://github.com/ukaraoz/microtrait
Expand All @@ -204,7 +204,7 @@ def _run_microtrait(
# object returned by the
# extract_traits function.

# programe_threads is not used in this function, but it is kept for consistency with another tools (e.g., eggnog, mash)
# threads_per_tool_run is not used in this function, but it is kept for consistency with another tools (e.g., eggnog, mash)
# since extract_traits function doesn't take the number of threads as an argument
# https://github.com/ukaraoz/microtrait/blob/master/R/extract_traits.R#L22-L26

Expand Down
8 changes: 7 additions & 1 deletion src/loaders/compute_tools/microtrait/versions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,10 @@ versions:
- version: 0.1.3
date: 2023-10-16
notes: |
- fix cells value data type
- fix cells value data type
- version: 0.1.4
date: 2024-04-05
notes: |
- Remove the 'threads' and 'program_threads' parameters
and introduce the capability to specify the number of threads per tool run
- pass job_id to the tool container and remove node_id
34 changes: 20 additions & 14 deletions src/loaders/compute_tools/tool_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def __init__(
(default: /global/cfs/cdirs/kbase/collections)
--threads_per_tool_run THREADS_PER_TOOL_RUN
Number of threads to execute a single tool command.
--node_id NODE_ID node ID for running job
--job_id JOB_ID job ID for running job
--debug Debug mode.
--data_id_file DATA_ID_FILE
tab separated file containing data ids for the running
Expand Down Expand Up @@ -130,7 +130,7 @@ def __init__(
self._threads_per_tool_run = args.threads_per_tool_run
self._debug = args.debug
self._data_id_file = args.data_id_file
self._node_id = args.node_id
self._job_id = args.job_id
self._source_file_ext = args.source_file_ext
self._data_ids = self._get_data_ids()

Expand Down Expand Up @@ -183,7 +183,7 @@ def _parse_args(self):
help='Number of threads to execute a single tool command.'
)
optional.add_argument(
'--node_id', type=str, default=str(uuid.uuid4()), help='node ID for running job'
'--job_id', type=str, default=str(uuid.uuid4()), help='job ID for running job'
)
optional.add_argument('--debug', action='store_true', help='Debug mode.')
optional.add_argument(
Expand Down Expand Up @@ -227,19 +227,20 @@ def _get_data_ids(self):

def _prepare_execution(
self,
unzip: bool
unzip: bool,
batch_tool: bool,
) -> Tuple[Path, Dict[str, Dict[str, Union[str, Path]]], List[str]]:

batch_dir, genomes_meta = _prepare_tool(
self._work_dir,
loader_common_names.COMPUTE_OUTPUT_NO_BATCH,
self._node_id,
self._job_id,
self._data_ids,
self._source_data_dir,
self._source_file_ext,
self._allow_missing_files,
self._tool_data_id_from_filename,
self._suffix_ids,
batch_tool,
)

unzipped_files_to_delete = []
Expand Down Expand Up @@ -280,7 +281,7 @@ def parallel_single_execution(self, tool_callable: Callable[[str, str, Path, Pat
unzip - if True, unzip the input file before passing it to the tool callable. (unzipped file will be deleted)
"""
start = time.time()
batch_dir, genomes_meta, unzipped_files_to_delete = self._prepare_execution(unzip)
batch_dir, genomes_meta, unzipped_files_to_delete = self._prepare_execution(unzip, batch_tool=False)

args_list = []
for data_id, meta in genomes_meta.items():
Expand Down Expand Up @@ -322,7 +323,7 @@ def parallel_batch_execution(self, tool_callable: Callable[[Dict[str, GenomeTupl
* A debug boolean
"""
start = time.time()
batch_dir, genomes_meta, unzipped_files_to_delete = self._prepare_execution(unzip)
batch_dir, genomes_meta, unzipped_files_to_delete = self._prepare_execution(unzip, batch_tool=True)

ids_to_files = dict()
for data_id, m in genomes_meta.items():
Expand All @@ -343,7 +344,7 @@ def parallel_batch_execution(self, tool_callable: Callable[[Dict[str, GenomeTupl
def _execute(
self,
tool_callable: Callable[..., None],
args: List[Tuple[Dict[str, GenomeTuple], Path, int, bool]] | List[Tuple[str, str, Path, Path, int, bool]],
args: Union[List[Tuple[Dict[str, GenomeTuple], Path, int, bool]], List[Tuple[str, str, Path, Path, int, bool]]],
start: datetime.datetime,
total: bool,
):
Expand Down Expand Up @@ -449,21 +450,26 @@ def create_metadata_file(
# other ways. Meh
def _prepare_tool(
work_dir: Path,
batch_id: str,
node_id: str,
job_id: str,
data_ids: List[str],
source_data_dir: Path,
source_file_ext: str,
allow_missing_files: bool,
data_id_from_filename: bool,
suffix_ids: bool,
batch_tool: bool,
) -> Tuple[Path, Dict[str, Dict[str, Union[str, Path]]]]:
# Prepares for tool execution by creating a batch directory and retrieving data
# files with associated metadata.

# create working directory for each batch
batch_dir = work_dir / (f'{loader_common_names.COMPUTE_OUTPUT_PREFIX}{batch_id}_size_'
+ f'{len(data_ids)}_node_{node_id}')
# create a working directory for each job
# parser program will need `COMPUTE_OUTPUT_PREFIX` as dir name prefix to identify compute directories
# and `COMPUTE_OUTPUT_NO_BATCH` as dir name suffix to identify non-batch directories
batch_job_dir_identifier = '' if batch_tool else loader_common_names.COMPUTE_OUTPUT_NO_BATCH
batch_dir_name = (f'{loader_common_names.COMPUTE_OUTPUT_PREFIX}_{job_id}'
f'_size_{len(data_ids)}{batch_job_dir_identifier}')
batch_dir = work_dir / batch_dir_name

os.makedirs(batch_dir, exist_ok=True)

# Retrieve genome files and associated metadata for each genome ID
Expand Down
6 changes: 4 additions & 2 deletions src/loaders/genome_collection/parse_tool_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,12 @@ def _process_fatal_error_tools(check_fatal_error_tools: set[str],
for tool in check_fatal_error_tools:
result_dir = _locate_dir(root_dir, env, kbase_collection, load_ver, tool=tool)
batch_dirs = _get_batch_dirs(result_dir)
batch_no_batch_prefix = loader_common_names.COMPUTE_OUTPUT_PREFIX + loader_common_names.COMPUTE_OUTPUT_NO_BATCH
if len(batch_dirs) == 1 and batch_dirs[0].startswith(batch_no_batch_prefix):

# For non-batched tools, such as mash, retrieve individual genome subdirectories
if len(batch_dirs) == 1 and loader_common_names.COMPUTE_OUTPUT_NO_BATCH in batch_dirs[0]:
batch_dirs = [os.path.join(batch_dirs[0], d) for d in os.listdir(os.path.join(result_dir, batch_dirs[0]))
if os.path.isdir(os.path.join(result_dir, batch_dirs[0], d))]

for batch_dir in batch_dirs:
data_dir = os.path.join(result_dir, batch_dir)
fatal_error_file = os.path.join(data_dir, loader_common_names.FATAL_ERROR_FILE)
Expand Down
4 changes: 2 additions & 2 deletions src/loaders/jobs/taskfarmer/task_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ def _create_task_list(
'SOURCE_VER': source_ver,
'LOAD_VER': load_ver,
'ROOT_DIR': root_dir,
'NODE_ID': f'job_{idx}',
'GENOME_ID_FILE': genome_id_file,
'JOB_ID': idx,
'DATA_ID_FILE': genome_id_file,
'THREADS_PER_TOOL_RUN': threads_per_tool_run,
'SOURCE_FILE_EXT': source_file_ext}

Expand Down

0 comments on commit 2f2d45d

Please sign in to comment.