Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.1.0 #23

Merged
merged 9 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,4 @@ dmypy.json
# Pyre type checker
.pyre/
.idea
examples/slurminade_example.txt
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ The project is reasonably easy:
Changes
-------

- 1.1.0: Slurminade can now be called from iPython, too! `exec` has been renamed `shell` to prevent confusion with the Python call `exec` which will evaluate a string as Python code.
- 1.0.1: Dispatcher now return jobs references instead of job ids. This allows to do some fancier stuff in the future, when the jobs infos are only available a short time after the job has been submitted.
- 0.10.1: FIX: Listing functions will no longer execute setup functions.
- 0.10.0: `Batch` is now named `JobBundling`. There is a method `join` for easier synchronization. `exec` allows to executed commands just like `srun` and `sbatch`, but uniform syntax with other slurmified functions. Functions can now also be called with `distribute_and_wait`. If you call `python3 -m slurminade.check --partition YOUR_PARTITION --constraint YOUR_CONSTRAINT` you can check if your slurm configuration is running correctly.
Expand Down
8 changes: 4 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ where = ["src"]

[project]
name = "slurminade"
version = "1.0.1"
version = "1.1.0"
authors = [
{ name = "TU Braunschweig, IBR, Algorithms Group (Dominik Krupke)", email = "krupke@ibr.cs.tu-bs.de" },
]
Expand Down Expand Up @@ -69,7 +69,7 @@ extend-ignore = [
"PT004", # Incorrect, just usefixtures instead.
"RUF009", # Too easy to get a false positive
]
target-version = "py37"
target-version = "py38"
src = ["src"]
unfixable = ["T20", "F841"]
exclude = []
Expand All @@ -78,7 +78,7 @@ exclude = []
[tool.mypy]
files = ["src", "tests"]
mypy_path = ["$MYPY_CONFIG_FILE_DIR/src"]
python_version = "3.7"
python_version = "3.8"
warn_unused_configs = true
show_error_codes = true
enable_error_code = ["ignore-without-code", "redundant-expr", "truthy-bool"]
Expand All @@ -95,7 +95,7 @@ ignore_missing_imports = true


[tool.pylint]
py-version = "3.7"
py-version = "3.8"
jobs = "0"
reports.output-format = "colorized"
similarities.ignore-imports = "yes"
Expand Down
3 changes: 2 additions & 1 deletion src/slurminade/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def clean_up():
"""

# flake8: noqa F401
from .function import slurmify
from .function import slurmify, shell
from .conf import update_default_configuration, set_default_configuration
from .guard import (
set_dispatch_limit,
Expand Down Expand Up @@ -94,6 +94,7 @@ def clean_up():
"TestDispatcher",
"SubprocessDispatcher",
"set_entry_point",
"shell",
"node_setup",
]

Expand Down
60 changes: 26 additions & 34 deletions src/slurminade/bundling.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import typing
from collections import defaultdict
from pathlib import Path

from .dispatcher import (
Dispatcher,
Expand All @@ -12,24 +13,24 @@
set_dispatcher,
)
from .function import SlurmFunction
from .guard import BatchGuard
from .options import SlurmOptions
from .job_reference import JobReference
from .options import SlurmOptions


class BundlingJobReference(JobReference):
def __init__(self) -> None:
super().__init__()
pass

def get_job_id(self) -> typing.Optional[int]:
return None

def get_exit_code(self) -> typing.Optional[int]:
return None

def get_info(self) -> typing.Dict[str, typing.Any]:
return {}


class TaskBuffer:
"""
A simple container to buffer all the tasks by their options.
Expand All @@ -39,21 +40,19 @@ class TaskBuffer:
def __init__(self):
self._tasks = defaultdict(list)

def add(self, task: FunctionCall, options: SlurmOptions) -> int:
self._tasks[options].append(task)
return len(self._tasks[options])
def add(self, task: FunctionCall, options: SlurmOptions, entry_point: Path) -> int:
self._tasks[(entry_point, options)].append(task)
return len(self._tasks[(entry_point, options)])

def items(self):
for opt, tasks in self._tasks.items():
for (entry_point, opt), tasks in self._tasks.items():
if tasks:
yield opt, tasks

def get(self, options: SlurmOptions) -> typing.List[FunctionCall]:
return self._tasks[options]
yield entry_point, opt, tasks

def clear(self):
self._tasks.clear()


class JobBundling(Dispatcher):
"""
The logic to buffer the function calls. It wraps the original dispatcher.
Expand All @@ -76,10 +75,9 @@ def __init__(self, max_size: int):
self.max_size = max_size
self.subdispatcher = get_dispatcher()
self._tasks = TaskBuffer()
self._batch_guard = BatchGuard()
self._all_job_ids = []

def flush(self, options: typing.Optional[SlurmOptions] = None) -> typing.List[int]:
def flush(self) -> typing.List[int]:
"""
Distribute all buffered tasks. Return the job ids used.
This method is called automatically when the context is exited.
Expand All @@ -89,24 +87,15 @@ def flush(self, options: typing.Optional[SlurmOptions] = None) -> typing.List[in
:return: A list of job ids.
"""
job_ids = []
if options is None:
for opt, tasks in self._tasks.items():
while tasks:
job_id = self.subdispatcher(tasks[: self.max_size], opt)
job_ids.append(job_id)
tasks = tasks[self.max_size :]

else:
tasks = self._tasks.get(options)
self._batch_guard.report_flush(len(tasks))
while len(tasks) > self.max_size:
job_id = self.subdispatcher(tasks[: self.max_size], options)
for entry_point, opt, tasks in self._tasks.items():
while tasks:
job_id = self.subdispatcher(tasks[: self.max_size], opt, entry_point)
job_ids.append(job_id)
tasks = tasks[: self.max_size]
tasks = tasks[self.max_size :]
self._tasks.clear()
self._all_job_ids.extend(job_ids)
return job_ids

def get_all_job_ids(self):
"""
Return all job ids that have been used.
Expand All @@ -122,20 +111,23 @@ def add(self, func: SlurmFunction, *args, **kwargs):
:return: None
"""
self._dispatch(
[FunctionCall(func.func_id, args, kwargs)], func.special_slurm_opts
[FunctionCall(func.func_id, args, kwargs)],
func.special_slurm_opts,
func.get_entry_point(),
)

def _dispatch(
self,
funcs: typing.Iterable[FunctionCall],
options: SlurmOptions,
entry_point: Path,
block: bool = False,
) -> JobReference:
if block:
# if blocking, we don't buffer, but dispatch immediately
return self.subdispatcher._dispatch(funcs, options, block=True)
return self.subdispatcher._dispatch(funcs, options, entry_point, block=True)
for func in funcs:
self._tasks.add(func, options)
self._tasks.add(func, options, entry_point)
return BundlingJobReference()

def srun(
Expand Down Expand Up @@ -189,13 +181,13 @@ def is_sequential(self):
return self.subdispatcher.is_sequential()



class Batch(JobBundling):
"""
Compatibility alias for JobBundling. This is the old name. Deprecated.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
logging.getLogger("slurminade").warning(
"The `Batch` class has been renamed to `JobBundling`. Please update your code."
)
)
20 changes: 10 additions & 10 deletions src/slurminade/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def _write_to_file(path, content):
time.sleep(1)
# get hostname and write it to the file
hostname = socket.gethostname()
with open(path, "w") as file:
with Path(path).open("w") as file:
print("Hello from ", hostname)
file.write(content + "\n" + hostname)
# wait a second for the file to be written
Expand Down Expand Up @@ -43,23 +43,23 @@ def check_slurm(partition, constraint):

# create a temporary folder for the slurm check
with tempfile.TemporaryDirectory(dir=".") as tmpdir:
tmpdir = Path(tmpdir).resolve()
assert Path(tmpdir).exists()
tmpdir_ = Path(tmpdir).resolve()
assert tmpdir_.exists()
# Check 1
tmp_file_path = tmpdir / "check_1.txt"
tmp_file_path = tmpdir_ / "check_1.txt"
_write_to_file.distribute_and_wait(str(tmp_file_path), "test")
if not Path(tmp_file_path).exists():
msg = "Slurminade failed: The file was not written to the temporary directory."
raise Exception(msg)
with open(tmp_file_path) as file:
with Path(tmp_file_path).open() as file:
content = file.readlines()
print(
"Slurminade check 1 successful. Test was run on node",
content[1].strip(),
)

# Check 2
tmp_file_path = tmpdir / "check_2.txt"
tmp_file_path = tmpdir_ / "check_2.txt"
_write_to_file.distribute(str(tmp_file_path), "test")
# wait up to 1 minutes for the file to be written
for _ in range(60):
Expand All @@ -69,7 +69,7 @@ def check_slurm(partition, constraint):
if not Path(tmp_file_path).exists():
msg = "Slurminade failed: The file was not written to the temporary directory."
raise Exception(msg)
with open(tmp_file_path) as file:
with Path(tmp_file_path).open() as file:
content = file.readlines()
print(
"Slurminade check 2 successful. Test was run on node",
Expand All @@ -79,7 +79,7 @@ def check_slurm(partition, constraint):
join()

# Check 3
tmp_file_path = tmpdir / "check_3.txt"
tmp_file_path = tmpdir_ / "check_3.txt"
srun(["touch", str(tmp_file_path)])
time.sleep(1)
if not Path(tmp_file_path).exists():
Expand All @@ -89,12 +89,12 @@ def check_slurm(partition, constraint):
tmp_file_path.unlink()

# Check 4
tmp_file_path = tmpdir / "check_4.txt"
tmp_file_path = tmpdir_ / "check_4.txt"
_write_to_file.distribute_and_wait(str(tmp_file_path), "test")
if not Path(tmp_file_path).exists():
msg = "Slurminade failed: The file was not written to the temporary directory."
raise Exception(msg)
with open(tmp_file_path) as file:
with Path(tmp_file_path).open() as file:
content = file.readlines()
print(
"Slurminade check 1 successful. Test was run on node",
Expand Down
23 changes: 17 additions & 6 deletions src/slurminade/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
__default_conf: typing.Dict = {}


def _load_conf(path):
def _load_conf(path: Path):
try:
if os.path.isfile(path):
with open(path) as f:
if path.is_file():
with path.open() as f:
return json.load(f)
else:
return {}
Expand All @@ -25,25 +25,36 @@ def _load_conf(path):


def update_default_configuration(conf=None, **kwargs):
"""
Adds or updates the default configuration.
:param conf: A dictionary with the configuration.
:param kwargs: Configuration parameters. (alternative to giving a dictionary)
"""
if conf:
__default_conf.update(conf)
if kwargs:
__default_conf.update(kwargs)


def _load_default_conf():
path = os.path.join(Path.home(), CONFIG_NAME)
path = Path.home() / CONFIG_NAME
update_default_configuration(_load_conf(path))
if "XDG_CONFIG_HOME" in os.environ:
path = os.path.join(os.environ["XDG_CONFIG_HOME"], "slurminade", CONFIG_NAME)
path = Path(os.environ["XDG_CONFIG_HOME"]) / "slurminade" / CONFIG_NAME
update_default_configuration(_load_conf(path))
update_default_configuration(_load_conf(CONFIG_NAME))
update_default_configuration(_load_conf(Path(CONFIG_NAME)))


_load_default_conf()


def set_default_configuration(conf=None, **kwargs):
"""
Replaces the default configuration.
This will overwrite the default configuration with the given one.
:param conf: A dictionary with the configuration.
:param kwargs: Configuration parameters. (alternative to giving a dictionary)
"""
__default_conf = {}
update_default_configuration(conf, **kwargs)

Expand Down
Loading
Loading