From 6038bc7a9bfded91815e3afea73a58b7b233587b Mon Sep 17 00:00:00 2001 From: Dominik Krupke Date: Sun, 25 Feb 2024 18:52:00 +0100 Subject: [PATCH] v0.10.0 --- README.rst | 29 +++--- examples/example_1.py | 4 +- examples/example_3.py | 2 +- pyproject.toml | 2 +- src/slurminade/__init__.py | 4 +- src/slurminade/batch.py | 168 ----------------------------------- src/slurminade/dispatcher.py | 19 ++-- src/slurminade/function.py | 5 +- tests/test_dispatch_guard.py | 2 +- 9 files changed, 38 insertions(+), 197 deletions(-) delete mode 100644 src/slurminade/batch.py diff --git a/README.rst b/README.rst index 0af6292..165be28 100644 --- a/README.rst +++ b/README.rst @@ -53,7 +53,10 @@ A simple script could look like this: import slurminade - slurminade.update_default_configuration(partition="alg") # global options for slurm + slurminade.update_default_configuration( + partition="alg", + exclusive=True + ) # global options for slurm # If no slurm environment is found, the functions are called directly to make scripts # compatible with any environment. @@ -84,26 +87,25 @@ A simple script could look like this: if __name__ == "__main__": - jid = prepare.distribute() + prepare.distribute() + slurminade.join() # make sure that no job runs before prepare has finished + with slurminade.JobBundling(max_size=20): # automatically bundles up to 20 tasks + # run 100x f after `prepare` has finished + for i in range(100): + f.distribute(i) - with slurminade.Batch(max_size=20) as batch: # automatically bundles up to 20 tasks - # run 100x f after `prepare` has finished - for i in range(100): - f.wait_for(jid).distribute(i) # no job id while in batch! - - # clean up after the previous jobs have finished - jids = batch.flush() # flush returns a list with all job ids. - clean_up.wait_for(jids).distribute() + slurminade.join() # make sure that the clean up jobs runs after all f-jobs have finished + clean_up.distribute() If slurm is not available, ``distribute`` results in a local function call. Analogous for ``srun`` and ``sbatch`` (giving some extra value on top of just forwarding to *simple_slurm*). .. warning:: - Always use ``Batch`` when distributing many small tasks to few nodes. Slurm + Always use ``JobBundling`` when distributing many small tasks to few nodes. Slurm jobs have a certain overhead and you do not want to spam your infrastructure with too many jobs. However, function calls - joined by ``Batch`` are considered as a single job by slurm, thus, + joined by ``JobBundling`` are considered as a single job by slurm, thus, not shared across nodes. **What are the limitations of slurminade?** Slurminade reconstructs the @@ -343,7 +345,7 @@ Project structure The project is reasonably easy: -- batch.py: Contains code for bundling tasks, so we don’t spam slurm +- bundling.py: Contains code for bundling tasks, so we don’t spam slurm with too many. - conf.py: Contains code for managing the configuration of slurm. - dispatcher.py: Contains code for actually dispatching tasks to slurm. @@ -358,6 +360,7 @@ The project is reasonably easy: Changes ------- +- 0.10.0: `Batch` is now named `JobBundling`. There is a method `join` for easier synchronization. `exec` allows to executed commands just like `srun` and `sbatch`, but uniform syntax with other slurmified functions. Functions can now also be called with `distribute_and_wait`. If you call `python3 -m slurminade.check --partition YOUR_PARTITION --constraint YOUR_CONSTRAINT` you can check if your slurm configuration is running correctly. - 0.9.0: Lots of improvements. - 0.8.1: Bugfix and automatic detection of wrong usage when using ``Batch`` with ``wait_for``. - 0.8.0: Added extensive logging and improved typing. diff --git a/examples/example_1.py b/examples/example_1.py index f7b0e0f..78135de 100644 --- a/examples/example_1.py +++ b/examples/example_1.py @@ -1,7 +1,7 @@ import datetime import slurminade -from slurminade import Batch +from slurminade import JobBundling slurminade.update_default_configuration(partition="alg", constraint="alggen02") @@ -15,6 +15,6 @@ def f(hello_world): if __name__ == "__main__": jid = f.distribute(f"Hello World from slurminade! {datetime.datetime.now()!s}") - with Batch(20) as batch: + with JobBundling(20) as batch: f.distribute("hello 1!") f.distribute("hello 2!") diff --git a/examples/example_3.py b/examples/example_3.py index 2d0b5a5..33dedfe 100644 --- a/examples/example_3.py +++ b/examples/example_3.py @@ -23,7 +23,7 @@ def clean_up(): if __name__ == "__main__": jid = prepare.distribute() - with slurminade.Batch(max_size=20) as batch: # automatically bundles up to 20 tasks + with slurminade.JobBundling(max_size=20) as batch: # automatically bundles up to 20 tasks # run 10x f after prepare has finished for i in range(100): f.wait_for(jid).distribute(i) diff --git a/pyproject.toml b/pyproject.toml index d321838..d887de0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ where = ["src"] [project] name = "slurminade" -version = "1.0.0" +version = "0.10.0" authors = [ { name = "TU Braunschweig, IBR, Algorithms Group (Dominik Krupke)", email = "krupke@ibr.cs.tu-bs.de" }, ] diff --git a/src/slurminade/__init__.py b/src/slurminade/__init__.py index 03549a9..2a9570f 100644 --- a/src/slurminade/__init__.py +++ b/src/slurminade/__init__.py @@ -62,7 +62,7 @@ def clean_up(): allow_recursive_distribution, disable_warning_on_repeated_flushes, ) -from .batch import Batch +from .bundling import JobBundling from .dispatcher import ( srun, sbatch, @@ -83,7 +83,7 @@ def clean_up(): "set_dispatch_limit", "allow_recursive_distribution", "disable_warning_on_repeated_flushes", - "Batch", + "JobBundling", "srun", "join", "sbatch", diff --git a/src/slurminade/batch.py b/src/slurminade/batch.py deleted file mode 100644 index 65ae8ae..0000000 --- a/src/slurminade/batch.py +++ /dev/null @@ -1,168 +0,0 @@ -""" -Contains code for bundling function calls together. -""" -import logging -import typing -from collections import defaultdict - -from .dispatcher import ( - Dispatcher, - FunctionCall, - get_dispatcher, - set_dispatcher, -) -from .function import SlurmFunction -from .guard import BatchGuard -from .options import SlurmOptions - - -class TaskBuffer: - """ - A simple container to buffer all the tasks by their options. - We can only bundle tasks with the same slurm options. - """ - - def __init__(self): - self._tasks = defaultdict(list) - - def add(self, task: FunctionCall, options: SlurmOptions) -> int: - self._tasks[options].append(task) - return len(self._tasks[options]) - - def items(self): - for opt, tasks in self._tasks.items(): - if tasks: - yield opt, tasks - - def get(self, options: SlurmOptions) -> typing.List[FunctionCall]: - return self._tasks[options] - - def clear(self): - self._tasks.clear() - - -class Batch(Dispatcher): - """ - The logic to buffer the function calls. It wraps the original dispatcher. - - You can use:: - - with slurminade.Batch(max_size=20) as batch: # automatically bundles up to 20 tasks - # run 100x f - for i in range(100): - f.distribute(i) - - to automatically bundle up to 20 tasks and distribute them. - """ - - def __init__(self, max_size: int): - """ - :param max_size: Bundle up to this many calls. - """ - super().__init__() - self.max_size = max_size - self.subdispatcher = get_dispatcher() - self._tasks = TaskBuffer() - self._batch_guard = BatchGuard() - - def flush(self, options: typing.Optional[SlurmOptions] = None) -> typing.List[int]: - """ - Distribute all buffered tasks. Return the job ids used. - This method is called automatically when the context is exited. - However, you may want to call it manually to get the job ids, - for example to use them for dependency management with ``wait_for``. - :param options: Only flush tasks with specific options. - :return: A list of job ids. - """ - job_ids = [] - if options is None: - for opt, tasks in self._tasks.items(): - while tasks: - job_id = self.subdispatcher(tasks[: self.max_size], opt) - job_ids.append(job_id) - tasks = tasks[self.max_size :] - - else: - tasks = self._tasks.get(options) - self._batch_guard.report_flush(len(tasks)) - while len(tasks) > self.max_size: - job_id = self.subdispatcher(tasks[: self.max_size], options) - job_ids.append(job_id) - tasks = tasks[: self.max_size] - self._tasks.clear() - return job_ids - - def add(self, func: SlurmFunction, *args, **kwargs): - """ - You can also add a task using `add` instead of `distribute`. - :param func: The function to call - :param args: The positional arguments - :param kwargs: The keywords arguments. - :return: None - """ - self._dispatch( - [FunctionCall(func.func_id, args, kwargs)], func.special_slurm_opts - ) - - def _dispatch( - self, - funcs: typing.Iterable[FunctionCall], - options: SlurmOptions, - block: bool = False, - ) -> int: - if block: - # if blocking, we don't buffer, but dispatch immediately - return self.subdispatcher._dispatch(funcs, options, block=True) - for func in funcs: - self._tasks.add(func, options) - return -1 - - def srun( - self, - command: str, - conf: typing.Optional[typing.Dict] = None, - simple_slurm_kwargs: typing.Optional[typing.Dict] = None, - ): - conf = SlurmOptions(conf if conf else {}) - return self.subdispatcher.srun(command, conf, simple_slurm_kwargs) - - def sbatch( - self, - command: str, - conf: typing.Optional[typing.Dict] = None, - simple_slurm_kwargs: typing.Optional[typing.Dict] = None, - ): - conf = SlurmOptions(conf if conf else {}) - return self.subdispatcher.sbatch(command, conf, simple_slurm_kwargs) - - def __enter__(self): - self.subdispatcher = get_dispatcher() - set_dispatcher(self) - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - if exc_type: - logging.getLogger("slurminade").error("Aborted due to exception.") - return - self.flush() - set_dispatcher(self.subdispatcher) - - def _log_dispatch(self, funcs: typing.List[FunctionCall], options: SlurmOptions): - if len(funcs) == 1: - logging.getLogger("slurminade").info( - f"Adding task to batch with options {options}: {funcs[0]}" - ) - else: - logging.getLogger("slurminade").info( - f"Adding {len(funcs)} tasks to batch with options {options}: {', '.join([str(f) for f in funcs])}" - ) - - def __del__(self): - self.flush() - - def join(self): - self.flush() - return self.subdispatcher.join() - - def is_sequential(self): - return self.subdispatcher.is_sequential() diff --git a/src/slurminade/dispatcher.py b/src/slurminade/dispatcher.py index acead57..a835f4d 100644 --- a/src/slurminade/dispatcher.py +++ b/src/slurminade/dispatcher.py @@ -231,7 +231,9 @@ def _dispatch( logging.getLogger("slurminade").debug(command) if block: ret = slurm.srun(command) - logging.getLogger("slurminade").info("Returned from srun with exit code %s", ret) + logging.getLogger("slurminade").info( + "Returned from srun with exit code %s", ret + ) return None jid = slurm.sbatch(command) self._all_job_ids.append(jid) @@ -433,7 +435,11 @@ def srun( if conf is None: conf = {} conf = SlurmOptions(**conf) - command = command if isinstance(command, str) else " ".join((shlex.quote(c) for c in command)) + command = ( + command + if isinstance(command, str) + else " ".join(shlex.quote(c) for c in command) + ) return get_dispatcher().srun(command, conf, simple_slurm_kwargs) @@ -453,7 +459,11 @@ def sbatch( if conf is None: conf = {} conf = SlurmOptions(**conf) - command = command if isinstance(command, str) else " ".join((shlex.quote(c) for c in command)) + command = ( + command + if isinstance(command, str) + else " ".join(shlex.quote(c) for c in command) + ) return get_dispatcher().sbatch(command, conf, simple_slurm_kwargs) @@ -463,6 +473,3 @@ def join(): :return: None """ get_dispatcher().join() - - - diff --git a/src/slurminade/function.py b/src/slurminade/function.py index 33a136f..303afa3 100644 --- a/src/slurminade/function.py +++ b/src/slurminade/function.py @@ -1,7 +1,7 @@ import inspect +import subprocess import typing from enum import Enum -import subprocess from .dispatcher import FunctionCall, dispatch, get_dispatcher from .function_map import FunctionMap @@ -191,11 +191,10 @@ def dec(func) -> SlurmFunction: return dec - @slurmify() def exec(cmd: typing.Union[str, typing.List[str]]): """ Execute a command. :param cmd: The command to be executed. """ - subprocess.run(cmd, check=True) \ No newline at end of file + subprocess.run(cmd, check=True) diff --git a/tests/test_dispatch_guard.py b/tests/test_dispatch_guard.py index 13fbe14..d5aae36 100644 --- a/tests/test_dispatch_guard.py +++ b/tests/test_dispatch_guard.py @@ -29,7 +29,7 @@ def test_dispatch_limit(self): def test_dispatch_limit_batch(self): slurminade.set_entry_point(__file__) set_dispatch_limit(2) - with slurminade.Batch(max_size=2): + with slurminade.JobBundling(max_size=2): for _ in range(4): f.distribute() self.assertRaises(TooManyDispatchesError, f.distribute)