Skip to content

Commit

Permalink
Merge pull request #695 from kbase/develop
Browse files Browse the repository at this point in the history
Develop -> Main (0.1.2 release)
  • Loading branch information
MrCreosote authored Mar 26, 2024
2 parents af5fca9 + 6c221f1 commit 7ff4f9a
Show file tree
Hide file tree
Showing 18 changed files with 679 additions and 684 deletions.
31 changes: 31 additions & 0 deletions .github/workflows/build-push-eggnog-image.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
name: Build & Push eggNOG Image to GHCR

on:
pull_request:
types:
- opened
- reopened
- synchronize
- ready_for_review
paths:
- 'src/loaders/compute_tools/eggnog/versions.yaml'
- '.github/workflows/build-push-eggnog-image.yml'
- '.github/workflows/build-push-tool-images.yml'

push:
branches:
- main
- master
- develop
paths:
- 'src/loaders/compute_tools/eggnog/versions.yaml'
- '.github/workflows/build-push-eggnog-image.yml'
- '.github/workflows/build-push-tool-images.yml'

jobs:
trigger-build-push:
uses: ./.github/workflows/build-push-tool-images.yml
with:
tool_name: eggnog
version_file: 'src/loaders/compute_tools/eggnog/versions.yaml'
secrets: inherit
8 changes: 8 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# KBase Collections Release Notes

## 0.1.2

* Fixed a bug that caused requests with filters to fail for filter keys containing colons.
* Adds eggNOG mapper as an experimental tool in the pipeline. It currently does not integrate
into the collections pipeline proper.
* Pipeline tools now have the number of threads to use for tool invocations in the argument list.
* The `--create_assembly_only` CLI flag was removed from the workspace uploader.

## 0.1.1

* Fixed a bug in the service manager script that would cause it to fail if the service had never
Expand Down
2 changes: 1 addition & 1 deletion src/common/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
The version of the KBase collections software.
'''

VERSION = "0.1.1"
VERSION = "0.1.2"
3 changes: 3 additions & 0 deletions src/loaders/common/loader_common_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@
# The metadata file name created during the Mash run
MASH_METADATA = 'mash_run_metadata.json'

# The metadata file name created during the Eggnog run
EGGNOG_METADATA = 'eggnog_run_metadata.json'

# The fatal error file created if a data file cannot be successfully processed
FATAL_ERROR_FILE = "fatal_error.json"

Expand Down
34 changes: 34 additions & 0 deletions src/loaders/compute_tools/eggnog/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
FROM continuumio/miniconda3:24.1.2-0

ENV EGGNOG_VER 2.1.12
ENV CONDA_ENV eggnog-$EGGNOG_VER
ENV PYTHON_VER 3.11

RUN conda config --add channels bioconda
RUN conda config --add channels conda-forge

RUN conda create -n $CONDA_ENV python=$PYTHON_VER
RUN conda install -n $CONDA_ENV -c conda-forge -c bioconda eggnog-mapper=$EGGNOG_VER
RUN conda install -n $CONDA_ENV pandas=2.2.1 jsonlines=2.0.0

RUN echo "source activate $CONDA_ENV" >> ~/.bashrc

# eggNOG annotation DB is pre-downloaded at /global/cfs/cdirs/kbase/collections/libraries/eggnog/5.0.2
# following instructions at https://github.com/eggnogdb/eggnog-mapper/wiki/eggNOG-mapper-v2.1.5-to-v2.1.12#setup
# Mount the annotation DB directory to /reference_data when running the container
ENV EGGNOG_DATA_DIR /reference_data

RUN mkdir -p /app
COPY ./ /app/collections
# slows down that chmod step if left in place
RUN rm -r /app/collections/.git

ENV PYTHONPATH /app/collections

WORKDIR /app

ENV PY_SCRIPT=/app/collections/src/loaders/compute_tools/eggnog/eggnog.py

RUN chmod -R 777 /app/collections

ENTRYPOINT ["/app/collections/src/loaders/compute_tools/entrypoint.sh"]
10 changes: 10 additions & 0 deletions src/loaders/compute_tools/eggnog/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@

# eggNOG tool

## Overview
The eggNOG tool is designed to utilize the collections infrastructure for execution and storage of result data.

This tool is exclusively intended for use with the CDM project.

The Collections parser program ([parse_tool_results.py](../../genome_collection/parse_tool_results.py)) will skip parsing the result files generated by this tool, as the result data is
specifically tailored for the CDM project.
58 changes: 58 additions & 0 deletions src/loaders/compute_tools/eggnog/eggnog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""
Run eggNOG tool on a set of faa files.
This tool serves a distinct purpose separate from collection tools; instead, it is suited for CDM work.
Therefore, the parser program is not compatible with data generated by this tool.
"""
import json
from pathlib import Path

from src.loaders.common.loader_common_names import EGGNOG_METADATA
from src.loaders.compute_tools.tool_common import ToolRunner, run_command

INPUT_TYPE = 'proteins'


def _run_eggnog_single(
tool_safe_data_id: str,
data_id: str,
source_file: Path,
output_dir: Path,
program_threads: int,
debug: bool) -> None:

metadata_file = output_dir / EGGNOG_METADATA
if metadata_file.exists():
print(f"Skipping {source_file} as it has already been processed.")
return

# RUN eggNOG for a single genome
command = ['emapper.py',
'-i', source_file, # Input file.
'-o', output_dir / source_file.name, # Output prefix.
# Save result file to collectiondata directory. Expecting 'emapper.annotations', 'emapper.hits' and 'emapper.seed_orthologs' files.
'--itype', f'{INPUT_TYPE}',
'--cpu', f'{program_threads}',
'--excel',
'--sensmode', 'fast',
'--dmnd_iterate', 'no',
'--override' # Overwrites output files if they exist from previous runs.
]

run_command(command, output_dir if debug else None)

# Save run info to a metadata file in the output directory for parsing later
metadata = {'source_file': str(source_file),
'input_type': INPUT_TYPE}
with open(metadata_file, 'w') as f:
json.dump(metadata, f, indent=4)


def main():
runner = ToolRunner("eggnog")
runner.parallel_single_execution(_run_eggnog_single, unzip=True)


if __name__ == "__main__":
main()
10 changes: 10 additions & 0 deletions src/loaders/compute_tools/eggnog/versions.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
versions:
- version: 0.1.0
date: 2024-03-13
reference_db_version: 5.0.2

- version: 0.1.1
date: 2024-03-15
notes: |
- add ability to specify thread number for execution
reference_db_version: 5.0.2
2 changes: 2 additions & 0 deletions src/loaders/compute_tools/mash/mash.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def _run_mash_single(
data_id: str,
source_file: Path,
output_dir: Path,
program_threads: int,
debug: bool,
kmer_size: int = KMER_SIZE,
sketch_size: int = SKETCH_SIZE) -> None:
Expand All @@ -25,6 +26,7 @@ def _run_mash_single(
# Save result file to source file directory. The suffix '.msh' will be appended.
'-k', f'{kmer_size}',
'-s', f'{sketch_size}',
'-p', f'{program_threads}',
source_file]

run_command(command, output_dir if debug else None)
Expand Down
6 changes: 5 additions & 1 deletion src/loaders/compute_tools/mash/versions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@ versions:
- version: 0.1.0
date: 2023-07-18
- version: 0.1.1
date: 2023-07-19
date: 2023-07-19
- version: 0.1.2
date: 2024-03-15
notes: |
- add ability to specify thread number for execution
12 changes: 11 additions & 1 deletion src/loaders/compute_tools/microtrait/microtrait.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,13 @@ def _process_trait_counts(
return heatmap_row, cells_meta, traits_meta


def _run_microtrait(tool_safe_data_id: str, data_id: str, fna_file: Path, genome_dir: Path, debug: bool):
def _run_microtrait(
tool_safe_data_id: str,
data_id: str,
fna_file: Path,
genome_dir: Path,
program_threads: int,
debug: bool):
# run microtrait.extract_traits on the genome file
# https://github.com/ukaraoz/microtrait

Expand All @@ -198,6 +204,10 @@ def _run_microtrait(tool_safe_data_id: str, data_id: str, fna_file: Path, genome
# object returned by the
# extract_traits function.

# programe_threads is not used in this function, but it is kept for consistency with another tools (e.g., eggnog, mash)
# since extract_traits function doesn't take the number of threads as an argument
# https://github.com/ukaraoz/microtrait/blob/master/R/extract_traits.R#L22-L26

# Load the R script as an R function
r_script = """
library(microtrait)
Expand Down
5 changes: 3 additions & 2 deletions src/loaders/compute_tools/tool_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def _get_data_ids(self):
data_ids = all_data_ids
return list(set(data_ids))

def parallel_single_execution(self, tool_callable: Callable[[str, str, Path, Path, bool], None], unzip=False):
def parallel_single_execution(self, tool_callable: Callable[[str, str, Path, Path, int, bool], None], unzip=False):
"""
Run a tool by a single data file, storing the results in a single batch directory with
the individual runs stored in directories by the data ID.
Expand Down Expand Up @@ -298,6 +298,7 @@ def parallel_single_execution(self, tool_callable: Callable[[str, str, Path, Pat
meta.get(loader_common_names.META_UNCOMPRESSED_FILE,
meta[loader_common_names.META_SOURCE_FILE]),
output_dir,
self._program_threads,
self._debug))

try:
Expand Down Expand Up @@ -378,7 +379,7 @@ def _execute(
self,
threads: int,
tool_callable: Callable[..., None],
args: List[Tuple[Dict[str, GenomeTuple], Path, int, bool]],
args: List[Tuple[Dict[str, GenomeTuple], Path, int, bool]] | List[Tuple[str, str, Path, Path, int, bool]],
start: datetime.datetime,
total: bool,
):
Expand Down
76 changes: 62 additions & 14 deletions src/loaders/jobs/taskfarmer/task_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,23 @@
To reflect this, we have set the "threads" and "program_threads" parameters in "_create_task_list" to 32,
indicating that each batch will use 32 cores. We have also set the execution time for GTDB-Tk to 65 minutes,
with some additional buffer time. To allow for a sufficient number of batches to be run within a given time limit,
we have set the "NODE_TIME_LIMIT" to 5 hours. With these settings, we expect to be able to process up to 16 batches,
we have set the "NODE_TIME_LIMIT_DEFAULT" to 5 hours. With these settings, we expect to be able to process up to 16 batches,
or 16,000 genomes, per node within the 5-hour time limit. We plan to make these parameters configurable based on
the specific tool being used. After conducting performance tests, we found that utilizing 32 cores per batch and
running 4 batches in parallel per NERSC node resulted in optimal performance, despite each node having a total of
256 cores.
'''

TOOLS_AVAILABLE = ['gtdb_tk', 'checkm2', 'microtrait', 'mash']
TOOLS_AVAILABLE = ['gtdb_tk', 'checkm2', 'microtrait', 'mash', 'eggnog']

# estimated execution time (in minutes) for each tool to process a chunk of data
TASK_META = {'gtdb_tk': {'chunk_size': 1000, 'exe_time': 65},
TASK_META = {'gtdb_tk': {'chunk_size': 1000, 'exe_time': 65, 'tasks_per_node': 4},
'eggnog': {'chunk_size': 100, 'exe_time': 15, 'node_time_limit': 0.5}, # Memory intensive tool - reserve more nodes with less node reservation time
'default': {'chunk_size': 5000, 'exe_time': 60}}
NODE_TIME_LIMIT = 5 # hours # TODO: automatically calculate this based on tool execution time and NODE_THREADS
NODE_TIME_LIMIT_DEFAULT = 5 # hours # TODO: automatically calculate this based on tool execution time and NODE_THREADS
MAX_NODE_NUM = 100 # maximum number of nodes to use
# The THREADS variable controls the number of parallel tasks per node
# TODO: make this configurable based on tool used. At present, we have set the value to 4 for optimal performance with GTDB-Tk.
NODE_THREADS = 4
# Used as THREADS variable in the batch script which controls the number of parallel tasks per node
TASKS_PER_NODE_DEFAULT = 1

REGISTRY = 'ghcr.io/kbase/collections'
VERSION_FILE = 'versions.yaml'
Expand Down Expand Up @@ -204,11 +204,29 @@ def _create_task_list(
program_threads: number of threads to use per task
For instance, if "threads" is set to 128 and "program_threads" to 32, then each task will run 4 batches in parallel.
We have chosen to use "taskfarmer" for parallelization, which means that "threads" and "program_threads" should
have the same value. This ensures that parallelization only happens between tasks, and not within them.
For tools capable of processing a batch of genomes, such as GTDB-TK and checkm2,
we have chosen to utilize the Slurm THREADS variable within Taskfarmer batch script (submit_taskfarmer.sl) for tool
parallelization.
From here on we will call this value 'tasks' rather than 'threads' to avoid confusion with operating system
threads. It determines the number of simultaneous tool executions per node.
'program_threads' is configured to match the CPU count per task execution. To ensure each task handles only one
tool execution at a time, both 'threads' and 'program_threads' should be set to the same value, due to the fact the
task generator program utilizes 'threads' divided by 'program_threads' to determine the quantity of parallel execution
in a task.
Setting 'threads' and 'program_threads' the same same number ensures that parallelization of tool executions
only happens between tasks, and not within tasks.
Each tool execution is specified to use 'program_threads' operating system threads.
For tools that can only handle genomes individually, such as microtrait, mash and eggNOG,
we still use the THREADS variable for task parallelization. However, within each task, we further parallelize tool
execution using the 'threads' variable. For instance, with 'threads' set to 16 and 'tasks_per_node' set to 4,
each node will concurrently execute 4 tasks, with each task executing 16 parallel tool operations.
Each tool will also use 'program_threads' operating threads, and so in the current example if 'program_threads'
is set to 8, the total number of operating system threads will be 4 * 16 * 8 = 512 threads. This is a significant
difference from the batch tasks described above
TODO: make threads/program_threads configurable based on tool used. However, for the time being, we have set
these parameters to 32 and , since this value has produced the highest throughput in our experiments.
these parameters to 32 since this value has produced the highest throughput for GTDB-TK in our experiments.
"""
source_data_dir = make_collection_source_dir(root_dir, env, kbase_collection, source_ver)
genome_ids = [path for path in os.listdir(source_data_dir) if
Expand Down Expand Up @@ -258,9 +276,9 @@ def _cal_node_num(tool, n_jobs):
"""

tool_exe_time = TASK_META.get(tool, TASK_META['default'])['exe_time']
jobs_per_node = NODE_TIME_LIMIT * 60 // tool_exe_time
max_jobs_per_node = max(_get_node_time_limit(tool) * 60 // tool_exe_time, 1) * _get_node_task_count(tool)

num_nodes = math.ceil(n_jobs / jobs_per_node)
num_nodes = math.ceil(n_jobs / max_jobs_per_node)

if num_nodes > MAX_NODE_NUM:
raise ValueError(f"The number of nodes required ({num_nodes}) is greater than the maximum "
Expand All @@ -269,6 +287,36 @@ def _cal_node_num(tool, n_jobs):
return num_nodes


def _get_node_task_count(tool: str) -> int:
"""
Get the number of parallel tasks to run on a node
"""

return TASK_META.get(tool, TASK_META['default']).get('tasks_per_node', TASKS_PER_NODE_DEFAULT)


def _get_node_time_limit(tool: str) -> float:
"""
Get the time limit for the node we reserved for the task
By default, we set the time limit to NODE_TIME_LIMIT (5 hours).
If in TASK_META, the tool has a different time limit (node_time_limit), we will use that.
"""

return TASK_META.get(tool, TASK_META['default']).get('node_time_limit', NODE_TIME_LIMIT_DEFAULT)


def _float_to_time(float_hours: float) -> str:
"""
Convert a floating point number of hours to a time string in the format HH:MM:SS
"""
hours = int(float_hours)
decimal_part = float_hours - hours
minutes = int(decimal_part * 60)
seconds = int(decimal_part * 3600) % 60
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"


def _create_batch_script(job_dir, task_list_file, n_jobs, tool):
"""
Create the batch script (submit_taskfarmer.sl) for job submission
Expand All @@ -280,13 +328,13 @@ def _create_batch_script(job_dir, task_list_file, n_jobs, tool):
#SBATCH -N {node_num + 1} -c 256
#SBATCH -q regular
#SBATCH --time={NODE_TIME_LIMIT}:00:00
#SBATCH --time={_float_to_time(_get_node_time_limit(tool))}
#SBATCH -C cpu
module load taskfarmer
cd {job_dir}
export THREADS={NODE_THREADS}
export THREADS={_get_node_task_count(tool)}
runcommands.sh {task_list_file}'''

Expand Down
Loading

0 comments on commit 7ff4f9a

Please sign in to comment.