diff --git a/.github/workflows/python-publish.yml b/.github/workflows/python-publish.yml index 6844a47..8f6bd54 100644 --- a/.github/workflows/python-publish.yml +++ b/.github/workflows/python-publish.yml @@ -11,7 +11,7 @@ permissions: jobs: deploy: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v3 diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index 3a2e651..f473d4a 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -8,7 +8,7 @@ on: jobs: unittest: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 strategy: matrix: python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"] @@ -30,7 +30,7 @@ jobs: lint: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v2 @@ -51,7 +51,7 @@ jobs: type-check: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v2 @@ -65,4 +65,4 @@ jobs: pip install .[test] - name: type check with mypy run: | - mypy . + mypy firecrest diff --git a/.mypy.ini b/.mypy.ini new file mode 100644 index 0000000..f3a193e --- /dev/null +++ b/.mypy.ini @@ -0,0 +1,2 @@ +[mypy-unasync] +ignore_missing_imports = True diff --git a/README.md b/README.md index 6ab8625..4004d7f 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ keycloak = f7t.ClientCredentialsAuth( ) # Setup the client for the specific account -client = f7t.Firecrest( +client = f7t.v1.Firecrest( firecrest_url="http://localhost:8000", authorization=keycloak ) diff --git a/async-example.md b/async-example.md index a5c4a2a..41432b7 100644 --- a/async-example.md +++ b/async-example.md @@ -47,7 +47,7 @@ async def workflow(client, i): async def main(): auth = firecrest.ClientCredentialsAuth(client_id, client_secret, token_uri) - client = firecrest.AsyncFirecrest(firecrest_url, authorization=auth) + client = firecrest.v1.AsyncFirecrest(firecrest_url, authorization=auth) # Set up the desired polling rate for each microservice. The float number # represents the number of seconds between consecutive requests in each diff --git a/docs/source/index.rst b/docs/source/index.rst index e16f84f..bb58986 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -14,11 +14,20 @@ You can also clone it from `Github `__ .. toctree:: :maxdepth: 2 - :caption: Contents: + :caption: Getting started: authorization tutorial_index - reference_index + + +.. toctree:: + :maxdepth: 2 + :caption: Reference: + + reference_auth + reference_v1_index + reference_v2_index + Contact ======= diff --git a/docs/source/reference_async.rst b/docs/source/reference_async.rst index 79679f0..11a0376 100644 --- a/docs/source/reference_async.rst +++ b/docs/source/reference_async.rst @@ -5,7 +5,7 @@ The library also provides an asynchronous API for the client: The ``AsyncFirecrest`` class **************************** -.. autoclass:: firecrest.AsyncFirecrest +.. autoclass:: firecrest.v1.AsyncFirecrest :members: :undoc-members: :show-inheritance: @@ -13,7 +13,7 @@ The ``AsyncFirecrest`` class The ``AsyncExternalDownload`` class *********************************** -.. autoclass:: firecrest.AsyncExternalDownload +.. autoclass:: firecrest.v1.AsyncExternalDownload :inherited-members: :members: :undoc-members: @@ -22,7 +22,7 @@ The ``AsyncExternalDownload`` class The ``AsyncExternalUpload`` class ********************************* -.. autoclass:: firecrest.AsyncExternalUpload +.. autoclass:: firecrest.v1.AsyncExternalUpload :inherited-members: :members: :undoc-members: diff --git a/docs/source/reference_async_v2.rst b/docs/source/reference_async_v2.rst new file mode 100644 index 0000000..2bc05ca --- /dev/null +++ b/docs/source/reference_async_v2.rst @@ -0,0 +1,11 @@ +Asynchronous FirecREST objects +============================== + +The library also provides an asynchronous API for the client: + +The ``AsyncFirecrest`` class +**************************** +.. autoclass:: firecrest.v2.AsyncFirecrest + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/reference_auth.rst b/docs/source/reference_auth.rst new file mode 100644 index 0000000..f65179f --- /dev/null +++ b/docs/source/reference_auth.rst @@ -0,0 +1,9 @@ +Authorization +============= + +The ``ClientCredentialsAuth`` class +*********************************** +.. autoclass:: firecrest.ClientCredentialsAuth + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/reference_basic.rst b/docs/source/reference_basic.rst index 8fee278..03cb699 100644 --- a/docs/source/reference_basic.rst +++ b/docs/source/reference_basic.rst @@ -6,7 +6,7 @@ Together with the authorisation class it takes care of the token and makes the a The ``Firecrest`` class *********************** -.. autoclass:: firecrest.Firecrest +.. autoclass:: firecrest.v1.Firecrest :members: :undoc-members: :show-inheritance: @@ -14,7 +14,7 @@ The ``Firecrest`` class The ``ExternalUpload`` class **************************** -.. autoclass:: firecrest.ExternalUpload +.. autoclass:: firecrest.v1.ExternalUpload :inherited-members: :members: :undoc-members: @@ -23,21 +23,13 @@ The ``ExternalUpload`` class The ``ExternalDownload`` class ****************************** -.. autoclass:: firecrest.ExternalDownload +.. autoclass:: firecrest.v1.ExternalDownload :inherited-members: :members: :undoc-members: :show-inheritance: -The ``ClientCredentialsAuth`` class -*********************************** -.. autoclass:: firecrest.ClientCredentialsAuth - :members: - :undoc-members: - :show-inheritance: - - Custom types of the library *************************** .. automodule:: firecrest.types diff --git a/docs/source/reference_sync_v2.rst b/docs/source/reference_sync_v2.rst new file mode 100644 index 0000000..ad66f17 --- /dev/null +++ b/docs/source/reference_sync_v2.rst @@ -0,0 +1,11 @@ +FirecREST objects +============================== + +Here is the API for the client: + +The ``Firecrest`` class +**************************** +.. autoclass:: firecrest.v2.Firecrest + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/reference_index.rst b/docs/source/reference_v1_index.rst similarity index 84% rename from docs/source/reference_index.rst rename to docs/source/reference_v1_index.rst index f81cd1b..cb967b8 100644 --- a/docs/source/reference_index.rst +++ b/docs/source/reference_v1_index.rst @@ -1,5 +1,5 @@ -Reference -========= +API v1 +====== .. toctree:: :maxdepth: 2 diff --git a/docs/source/reference_v2_index.rst b/docs/source/reference_v2_index.rst new file mode 100644 index 0000000..27ed4b4 --- /dev/null +++ b/docs/source/reference_v2_index.rst @@ -0,0 +1,9 @@ +API v2 +====== + +.. toctree:: + :maxdepth: 2 + :caption: Contents: + + reference_sync_v2 + reference_async_v2 diff --git a/docs/source/tutorial_async.rst b/docs/source/tutorial_async.rst index 8652ae1..b28e473 100644 --- a/docs/source/tutorial_async.rst +++ b/docs/source/tutorial_async.rst @@ -1,5 +1,5 @@ -How to use the asynchronous API [experimental] -============================================== +How to use the asynchronous API +=============================== In this tutorial, we will explore the asynchronous API of the pyFirecREST library. Asynchronous programming is a powerful technique that allows you to write more efficient and responsive code by handling concurrent tasks without blocking the main execution flow. @@ -15,7 +15,7 @@ First you will need to create an ``AsyncFirecrest`` object, instead of the simpl .. code-block:: Python - client = fc.AsyncFirecrest( + client = fc.v1.AsyncFirecrest( firecrest_url=, authorization=MyAuthorizationClass() ) diff --git a/docs/source/tutorial_basic.rst b/docs/source/tutorial_basic_v1.rst similarity index 98% rename from docs/source/tutorial_basic.rst rename to docs/source/tutorial_basic_v1.rst index 14a7393..e093540 100644 --- a/docs/source/tutorial_basic.rst +++ b/docs/source/tutorial_basic_v1.rst @@ -1,5 +1,5 @@ -Simple tutorial -=============== +Tutorial for FirecREST v1 +========================= Your starting point to use pyFirecREST will be the creation of a FirecREST object. This is simply a mini client that, in cooperation with the authorization object, will take care of the necessary requests that need to be made and handle the responses. @@ -19,7 +19,7 @@ For this tutorial we will assume the simplest kind of authorization class, where return # Setup the client with the appropriate URL and the authorization class - client = fc.Firecrest(firecrest_url=, authorization=MyAuthorizationClass()) + client = fc.v1.Firecrest(firecrest_url=, authorization=MyAuthorizationClass()) Simple blocking requests diff --git a/docs/source/tutorial_basic_v2.rst b/docs/source/tutorial_basic_v2.rst new file mode 100644 index 0000000..eb13e38 --- /dev/null +++ b/docs/source/tutorial_basic_v2.rst @@ -0,0 +1,205 @@ +Tutorial for FirecREST v2 +========================= + +This tutorial will guide you through the basic functionalities of v2 of the FirecREST API. +Since the API of FirecREST v2 has some important differences the python client, cannot be the same as the one for FirecREST v1. + +Your starting point to use pyFirecREST will be the creation of a `FirecREST` object. +This is simply a mini client that, in cooperation with the authorization object, will take care of the necessary requests that need to be made and handle the responses. + +If you want to understand how to setup your authorization object have a look at the previous section. +For this tutorial we will assume the simplest kind of authorization class, where the same token will always be used. + +.. code-block:: Python + + import firecrest as fc + + class MyAuthorizationClass: + def __init__(self): + pass + + def get_access_token(self): + return + + # Setup the client with the appropriate URL and the authorization class + client = fc.v2.Firecrest(firecrest_url=, authorization=MyAuthorizationClass()) + + +Simple blocking requests +------------------------ + +Most of the methods of the FirecREST object require a simple http request to FirecREST. +With the client we just created here are a couple of examples of listing the files of a directory or getting all the available systems of FirecREST. + + +Getting all the available systems +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +A good starting point is to retrieve the list of systems available in FirecREST. This validates your token and helps you choose a target system for future requests. + +.. code-block:: Python + + systems = client.systems() + print(systems) + +Systems is going to be a list of systems and their properties, and you will have to choose from one of them. +This is an example of the output: + +.. code-block:: json + + [ + { + "name": "cluster", + "host": "cluster.alps.cscs.ch", + "sshPort": 22, + "sshCertEmbeddedCmd": true, + "scheduler": { + "type": "slurm", + "version": "24.05.4", + "apiUrl": null, + "apiVersion": null + }, + "servicesHealth": [ + { + "serviceType": "scheduler", + "lastChecked": "2025-01-06T11:09:29.975235Z", + "latency": 0.6163430213928223, + "healthy": true, + "message": null, + "nodes": { + "available": 280, + "total": 600 + } + }, + { + "serviceType": "ssh", + "lastChecked": "2025-01-06T11:09:29.951104Z", + "latency": 0.5919253826141357, + "healthy": true, + "message": null + }, + { + "serviceType": "filesystem", + "lastChecked": "2025-01-06T11:09:29.955848Z", + "latency": 0.5964689254760742, + "healthy": true, + "message": null, + "path": "/capstor/scratch/cscs" + }, + { + "serviceType": "filesystem", + "lastChecked": "2025-01-06T11:09:29.955997Z", + "latency": 0.59639573097229, + "healthy": true, + "message": null, + "path": "/users" + }, + { + "serviceType": "filesystem", + "lastChecked": "2025-01-06T11:09:29.955792Z", + "latency": 0.5958302021026611, + "healthy": true, + "message": null, + "path": "/capstor/store/cscs" + } + ], + "probing": { + "interval": 300, + "timeout": 10, + "maxLatency": null, + "maxLoad": null + }, + "fileSystems": [ + { + "path": "/capstor/scratch/cscs", + "dataType": "scratch", + "defaultWorkDir": true + }, + { + "path": "/users", + "dataType": "users", + "defaultWorkDir": false + }, + { + "path": "/capstor/store/cscs", + "dataType": "store", + "defaultWorkDir": false + } + ], + "datatransferJobsDirectives": [ + "#SBATCH --nodes=1", + "#SBATCH --time=0-00:15:00" + ], + "timeouts": { + "sshConnection": 5, + "sshLogin": 5, + "sshCommandExecution": 5 + } + } + ] + +Listing files in a directory +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Let's say you want to list the directory in the filesystem of a machine called "cluster". +You can get a list of the files, with all the usual properties that ls provides (size, type, permissions etc). + +.. code-block:: Python + + files = client.list_files("cluster", "/home/test_user") + print(files) + +The output will be something like this: + +.. code-block:: json + + [ + { + "group": "test_user", + "lastModified": "2020-04-11T14:53:11", + "linkTarget": "", + "name": "test_directory", + "permissions": "rwxrwxr-x", + "size": "4096", + "type": "d", + "user": "test_user" + }, + { + "group": "test_user", + "lastModified": "2020-04-11T14:14:23", + "linkTarget": "", + "name": "test_file.txt", + "permissions": "rw-rw-r--", + "size": "10", + "type": "-", + "user": "test_user" + } + ] + +Interact with the scheduler +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +FirecREST v2 simplifies job submission, monitoring, and cancellation. These operations now require only a single API request. +As a result the pyFirecREST client has been simplified and the user can interact with the scheduler in a more efficient way. + +This is how can make a simple job submission, when the batch script is on your local filesystem: + +.. code-block:: Python + + job = client.submit("cluster", working_directory="/home/test_user", script_local_path="script.sh") + print(job) + +For a successful submission the output would look like this. + +.. code-block:: json + + { + "jobid": 42, + } + +In FirecREST v2, the user selects the working directory where the job will be submitted from. + +.. Transfer of large files +.. ----------------------- + +.. TODO diff --git a/docs/source/tutorial_cli.rst b/docs/source/tutorial_cli.rst index c11aac6..93ca71e 100644 --- a/docs/source/tutorial_cli.rst +++ b/docs/source/tutorial_cli.rst @@ -3,6 +3,10 @@ How to use the CLI After version 1.3.0, pyFirecREST comes together with a CLI but for now it can only be used with the ``ClientCredentialsAuth`` authentication class. +.. attention:: + + The CLI currently only supports FirecREST v1. Support for v2 is planned for the next release. + You will need to set the environment variables ``FIRECREST_CLIENT_ID``, ``FIRECREST_CLIENT_SECRET`` and ``AUTH_TOKEN_URL`` to set up the Client Credentials client, as well as ``FIRECREST_URL`` with the URL for the FirecREST instance you are using. After that you can explore the capabilities of the CLI with the `--help` option: diff --git a/docs/source/tutorial_errors.rst b/docs/source/tutorial_errors.rst index 71bb348..21e622f 100644 --- a/docs/source/tutorial_errors.rst +++ b/docs/source/tutorial_errors.rst @@ -14,8 +14,8 @@ Here is an example of the code that will handle those failures. try: - parameters = client.parameters() - print(f"Firecrest parameters: {parameters}") + files = client.list_files("cluster", "/home/test_user") + print(f"List of files: {files}") except fc.FirecrestException as e: # You can just print the exception to get more information about the type of error, # for example an invalid or expired token. diff --git a/docs/source/tutorial_index.rst b/docs/source/tutorial_index.rst index cfa7af3..aac1abd 100644 --- a/docs/source/tutorial_index.rst +++ b/docs/source/tutorial_index.rst @@ -5,7 +5,8 @@ Tutorials :maxdepth: 2 :caption: Contents: - tutorial_basic + tutorial_basic_v1 + tutorial_basic_v2 tutorial_logging tutorial_errors tutorial_cli diff --git a/firecrest/FirecrestException.py b/firecrest/FirecrestException.py index 108f5c8..5ccd6ae 100644 --- a/firecrest/FirecrestException.py +++ b/firecrest/FirecrestException.py @@ -114,5 +114,18 @@ def __str__(self): ) +class TransferJobFailedException(Exception): + """Exception raised when the polling iterator is exhausted""" + + def __init__(self, transfer_job_info): + self._transfer_job_info = transfer_job_info + + def __str__(self): + return ( + f"Transfer job failed. Check the log files for more " + f"information: {self._transfer_job_info['transferJob']}" + ) + + class NotImplementedOnAPIversion(Exception): """Exception raised when a feature is not developed yet for the current API version""" diff --git a/firecrest/__init__.py b/firecrest/__init__.py index ce42e1c..3cf1a35 100644 --- a/firecrest/__init__.py +++ b/firecrest/__init__.py @@ -7,7 +7,7 @@ import sys -__version__ = "2.7.0" +__version__ = "3.0.0" __app_name__ = "firecrest" MIN_PYTHON_VERSION = (3, 7, 0) @@ -19,14 +19,7 @@ ) sys.exit(1) -from firecrest.BasicClient import Firecrest -from firecrest.AsyncClient import AsyncFirecrest -from firecrest.ExternalStorage import ExternalDownload, ExternalUpload, ExternalStorage -from firecrest.AsyncExternalStorage import ( - AsyncExternalDownload, - AsyncExternalUpload, - AsyncExternalStorage, -) +from . import v1, v2 from firecrest.Authorization import ClientCredentialsAuth from firecrest.FirecrestException import ( ClientsCredentialsException, diff --git a/firecrest/cli/__init__.py b/firecrest/cli/__init__.py index b46dd6e..edea3ab 100644 --- a/firecrest/cli/__init__.py +++ b/firecrest/cli/__init__.py @@ -65,7 +65,7 @@ "repr.number": "none", } console = Console(theme=Theme(custom_theme)) -client: fc.Firecrest = None # type: ignore +client: fc.v1.Firecrest = None # type: ignore logger = logging.getLogger(__name__) @@ -1768,7 +1768,7 @@ def main( global client auth_obj = fc.ClientCredentialsAuth(client_id, client_secret, token_url) auth_obj.timeout = auth_timeout - client = fc.Firecrest(firecrest_url=firecrest_url, authorization=auth_obj) + client = fc.v1.Firecrest(firecrest_url=firecrest_url, authorization=auth_obj) client.timeout = timeout if api_version: client.set_api_version(api_version) diff --git a/firecrest/path.py b/firecrest/path.py index d36a795..f157cae 100644 --- a/firecrest/path.py +++ b/firecrest/path.py @@ -11,8 +11,9 @@ import tempfile from typing import Callable, Iterator, List -from firecrest import Firecrest, ClientCredentialsAuth -from firecrest.BasicClient import logger as FcLogger +from firecrest import ClientCredentialsAuth +from firecrest.v1 import Firecrest +from firecrest.v1.BasicClient import logger as FcLogger from firecrest.FirecrestException import HeaderException @@ -310,7 +311,7 @@ def _lstat_mode(self) -> int: self._cache.lst_mode = item._cache.lst_mode return item._cache.lst_mode raise FileNotFoundError(self) - + def resolve(self) -> Self: """Resolve a path, removing '..' and '.' components.""" parts: List[str] = [] @@ -349,7 +350,7 @@ def _stat_mode(self) -> int: if stat.S_ISLNK(mode): if not item._cache.link_target: raise FileNotFoundError(f"Symlink has no target path: {self}") - pureposixpath = PurePosixPath(item._cache.link_target) + pureposixpath = PurePosixPath(item._cache.link_target) if not pureposixpath.is_absolute(): path = path.parent.joinpath(pureposixpath).resolve() else: diff --git a/firecrest/utilities.py b/firecrest/utilities.py index 6f202d4..fef069b 100644 --- a/firecrest/utilities.py +++ b/firecrest/utilities.py @@ -29,7 +29,10 @@ def slurm_state_completed(state): 'TIMEOUT', } if state: - return all(s in completion_states for s in state.split(',')) + # Make sure all the steps include one of the completion states + return all( + any(cs in s for cs in completion_states) for s in state.split(',') + ) return False diff --git a/firecrest/AsyncClient.py b/firecrest/v1/AsyncClient.py similarity index 99% rename from firecrest/AsyncClient.py rename to firecrest/v1/AsyncClient.py index c94c7bb..7fb34e9 100644 --- a/firecrest/AsyncClient.py +++ b/firecrest/v1/AsyncClient.py @@ -27,7 +27,7 @@ import firecrest.FirecrestException as fe import firecrest.types as t -from firecrest.AsyncExternalStorage import AsyncExternalUpload, AsyncExternalDownload +from firecrest.v1.AsyncExternalStorage import AsyncExternalUpload, AsyncExternalDownload from firecrest.utilities import ( async_validate_api_version_compatibility, parse_retry_after, @@ -921,7 +921,8 @@ async def compress( :param target_path: the absolute target path :param dereference: follow symbolic links :param fail_on_timeout: if `True` on timeout, this method will raise an - exception and won't fall back to submitting a long running job + exception and won't fall back to submitting a long + running job :calls: POST `/utilities/compress` .. warning:: This is available only for FirecREST>=1.16.0 @@ -1021,7 +1022,8 @@ async def extract( :param target_path: the absolute target path where the `source_path` is extracted :param file_extension: possible values are `auto`, `.zip`, `.tar`, `.tgz`, `.gz` and `.bz2` :param fail_on_timeout: if `True` on timeout, this method will raise an - exception and won't fall back to submitting a long running job + exception and won't fall back to submitting a + long running job :calls: POST `/utilities/extract` .. warning:: This is available only for FirecREST>=1.16.0 @@ -1678,10 +1680,12 @@ async def reservations( ) -> List[t.ReservationInfo]: """Retrieves information about the reservations. This call uses the `scontrol show reservations` command. + :param machine: the machine name where the scheduler belongs to :param reservations: specific reservations to query :calls: GET `/compute/reservations` GET `/tasks` + .. warning:: This is available only for FirecREST>=1.16.0 """ params = {} diff --git a/firecrest/AsyncExternalStorage.py b/firecrest/v1/AsyncExternalStorage.py similarity index 97% rename from firecrest/AsyncExternalStorage.py rename to firecrest/v1/AsyncExternalStorage.py index 46713f7..2e6bd07 100644 --- a/firecrest/AsyncExternalStorage.py +++ b/firecrest/v1/AsyncExternalStorage.py @@ -8,7 +8,6 @@ import asyncio from io import BufferedWriter -import itertools import logging import pathlib import requests @@ -19,7 +18,7 @@ from packaging.version import Version if TYPE_CHECKING: - from firecrest.AsyncClient import AsyncFirecrest + from firecrest.v1.AsyncClient import AsyncFirecrest as AsyncFirecrestV1 from contextlib import nullcontext from requests.compat import json # type: ignore @@ -39,7 +38,7 @@ class AsyncExternalStorage: def __init__( self, - client: AsyncFirecrest, + client: AsyncFirecrestV1, task_id: str, previous_responses: Optional[List[requests.Response]] = None, ) -> None: @@ -53,7 +52,7 @@ def __init__( self._responses = previous_responses @property - def client(self) -> AsyncFirecrest: + def client(self) -> AsyncFirecrestV1: """Returns the client that will be used to get information for the task.""" return self._client @@ -152,7 +151,7 @@ class AsyncExternalUpload(AsyncExternalStorage): def __init__( self, - client: AsyncFirecrest, + client: AsyncFirecrestV1, task_id: str, previous_responses: Optional[List[requests.Response]] = None, ) -> None: @@ -213,7 +212,7 @@ class AsyncExternalDownload(AsyncExternalStorage): def __init__( self, - client: AsyncFirecrest, + client: AsyncFirecrestV1, task_id: str, previous_responses: Optional[List[requests.Response]] = None, ) -> None: diff --git a/firecrest/BasicClient.py b/firecrest/v1/BasicClient.py similarity index 99% rename from firecrest/BasicClient.py rename to firecrest/v1/BasicClient.py index 94fba13..7ab924e 100644 --- a/firecrest/BasicClient.py +++ b/firecrest/v1/BasicClient.py @@ -24,7 +24,7 @@ import firecrest.FirecrestException as fe import firecrest.types as t -from firecrest.ExternalStorage import ExternalUpload, ExternalDownload +from firecrest.v1.ExternalStorage import ExternalUpload, ExternalDownload from firecrest.utilities import ( parse_retry_after, slurm_state_completed, @@ -32,6 +32,7 @@ validate_api_version_compatibility ) + if sys.version_info >= (3, 8): from typing import Literal else: @@ -674,7 +675,8 @@ def compress( :param target_path: the absolute target path :param dereference: follow symbolic links :param fail_on_timeout: if `True` on timeout, this method will raise an - exception and won't fall back to submitting a long running job + exception and won't fall back to submitting a + long running job :calls: POST `/utilities/compress` .. warning:: This is available only for FirecREST>=1.16.0 @@ -776,7 +778,8 @@ def extract( :param target_path: the absolute target path where the `source_path` is extracted :param extension: file extension, possible values are `auto`, `.zip`, `.tar`, `.tgz`, `.gz` and `.bz2` :param fail_on_timeout: if `True` on timeout, this method will raise an - exception and won't fall back to submitting a long running job + exception and won't fall back to submitting a + long running job :calls: POST `/utilities/extract` .. warning:: This is available only for FirecREST>=1.16.0 @@ -1458,10 +1461,12 @@ def reservations( ) -> List[t.ReservationInfo]: """Retrieves information about the compute reservations. This call uses the `scontrol show reservations` command. + :param machine: the machine name where the scheduler belongs to :param nodes: specific reservations to query :calls: GET `/compute/reservations` GET `/tasks` + .. warning:: This is available only for FirecREST>=1.16.0 """ params = {} diff --git a/firecrest/ExternalStorage.py b/firecrest/v1/ExternalStorage.py similarity index 98% rename from firecrest/ExternalStorage.py rename to firecrest/v1/ExternalStorage.py index e828e2a..19d0c5e 100644 --- a/firecrest/ExternalStorage.py +++ b/firecrest/v1/ExternalStorage.py @@ -21,7 +21,7 @@ from packaging.version import Version if TYPE_CHECKING: - from firecrest.BasicClient import Firecrest + from firecrest.v1.BasicClient import Firecrest as FirecrestV1 from contextlib import nullcontext from requests.compat import json # type: ignore @@ -41,7 +41,7 @@ class ExternalStorage: def __init__( self, - client: Firecrest, + client: FirecrestV1, task_id: str, previous_responses: Optional[List[requests.Response]] = None, ) -> None: @@ -56,7 +56,7 @@ def __init__( self._responses = previous_responses @property - def client(self) -> Firecrest: + def client(self) -> FirecrestV1: """Returns the client that will be used to get information for the task.""" return self._client @@ -156,7 +156,7 @@ class ExternalUpload(ExternalStorage): def __init__( self, - client: Firecrest, + client: FirecrestV1, task_id: str, previous_responses: Optional[List[requests.Response]] = None, ) -> None: @@ -215,7 +215,7 @@ class ExternalDownload(ExternalStorage): def __init__( self, - client: Firecrest, + client: FirecrestV1, task_id: str, previous_responses: Optional[List[requests.Response]] = None, ) -> None: diff --git a/firecrest/v1/__init__.py b/firecrest/v1/__init__.py new file mode 100644 index 0000000..43c5e0a --- /dev/null +++ b/firecrest/v1/__init__.py @@ -0,0 +1,19 @@ +# +# Copyright (c) 2024, ETH Zurich. All rights reserved. +# +# Please, refer to the LICENSE file in the root directory. +# SPDX-License-Identifier: BSD-3-Clause +# + +from firecrest.v1.AsyncClient import AsyncFirecrest +from firecrest.v1.AsyncExternalStorage import ( + AsyncExternalDownload, + AsyncExternalUpload, + AsyncExternalStorage, +) +from firecrest.v1.BasicClient import Firecrest +from firecrest.v1.ExternalStorage import ( + ExternalDownload, + ExternalUpload, + ExternalStorage) + diff --git a/firecrest/v2/__init__.py b/firecrest/v2/__init__.py new file mode 100644 index 0000000..f467ea0 --- /dev/null +++ b/firecrest/v2/__init__.py @@ -0,0 +1,9 @@ +# +# Copyright (c) 2024, ETH Zurich. All rights reserved. +# +# Please, refer to the LICENSE file in the root directory. +# SPDX-License-Identifier: BSD-3-Clause +# + +from firecrest.v2._async.Client import AsyncFirecrest # noqa +from firecrest.v2._sync.Client import Firecrest # noqa diff --git a/firecrest/v2/_async/Client.py b/firecrest/v2/_async/Client.py new file mode 100644 index 0000000..40f22a4 --- /dev/null +++ b/firecrest/v2/_async/Client.py @@ -0,0 +1,1028 @@ +# +# Copyright (c) 2024, ETH Zurich. All rights reserved. +# +# Please, refer to the LICENSE file in the root directory. +# SPDX-License-Identifier: BSD-3-Clause +# +from __future__ import annotations + +import aiofiles +import asyncio +import httpx +import json +import logging +import os +import pathlib +import ssl + +from io import BytesIO +from packaging.version import Version, parse +from typing import Any, Optional, List + +from firecrest.utilities import ( + parse_retry_after, slurm_state_completed, time_block +) +from firecrest.FirecrestException import ( + FirecrestException, + TransferJobFailedException, + UnexpectedStatusException +) + + +logger = logging.getLogger(__name__) + + +# This function is temporarily here +def handle_response(response): + print("\nResponse status code:") + print(response.status_code) + print("\nResponse headers:") + print(json.dumps(dict(response.headers), indent=4)) + print("\nResponse json:") + try: + print(json.dumps(response.json(), indent=4)) + except json.JSONDecodeError: + print("-") + + +def sleep_generator(): + yield 0.2 # First yield 2 seconds because the api takes time to update + value = 0.5 + while True: + yield value + value *= 2 # Double the value for each iteration + + +class AsyncFirecrest: + """ + This is the basic class you instantiate to access the FirecREST API v2. + Necessary parameters are the firecrest URL and an authorization object. + + :param firecrest_url: FirecREST's URL + :param authorization: the authorization object. This object is responsible + of handling the credentials and the only requirement + for it is that it has a method get_access_token() + that returns a valid access token. + :param verify: either a boolean, in which case it controls whether + requests will verify the server’s TLS certificate, + or a string, in which case it must be a path to a CA bundle + to use + """ + + TOO_MANY_REQUESTS_CODE = 429 + + def _retry_requests(func): + async def wrapper(*args, **kwargs): + client = args[0] + num_retries = 0 + resp = await func(*args, **kwargs) + while True: + if resp.status_code != client.TOO_MANY_REQUESTS_CODE: + break + elif ( + client.num_retries_rate_limit is not None + and num_retries >= client.num_retries_rate_limit + ): + client.log( + logging.DEBUG, + f"Rate limit is reached and the request has " + f"been retried already {num_retries} times" + ) + break + else: + reset = resp.headers.get( + "Retry-After", + default=resp.headers.get( + "RateLimit-Reset", default=10 + ), + ) + reset = parse_retry_after(reset, client.log) + client.log( + logging.INFO, + f"Rate limit is reached, will sleep for " + f"{reset} seconds and try again" + ) + await asyncio.sleep(reset) + resp = await func(*args, **kwargs) + num_retries += 1 + + return resp + + return wrapper + + def __init__( + self, + firecrest_url: str, + authorization: Any, + verify: str | bool | ssl.SSLContext = True, + ) -> None: + self._firecrest_url = firecrest_url + self._authorization = authorization + self._verify = verify + #: This attribute will be passed to all the requests that will be made. + #: How many seconds to wait for the server to send data before giving + # up. After that time a `requests.exceptions.Timeout` error will be + # raised. + #: + #: It can be a float or a tuple. More details here: + # https://www.python-httpx.org/advanced/#fine-tuning-the-configuration. + self.timeout: Any = None + # type is Any because of some incompatibility between httpx and + # requests library + + #: Disable all logging from the client. + self.disable_client_logging: bool = False + #: Number of retries in case the rate limit is reached. When it is set + # to `None`, the client will keep trying until it gets a different + # status code than 429. + self.num_retries_rate_limit: Optional[int] = None + self._api_version: Version = parse("2.0.0") + self._session = httpx.AsyncClient(verify=self._verify) + + def set_api_version(self, api_version: str) -> None: + """Set the version of the api of firecrest. By default it will be + assumed that you are using version 2.0.0 or compatible. The version is + parsed by the `packaging` library. + """ + self._api_version = parse(api_version) + + async def close_session(self) -> None: + """Close the httpx session""" + await self._session.aclose() + + async def create_new_session(self) -> None: + """Create a new httpx session""" + if not self._session.is_closed: + await self._session.aclose() + + self._session = httpx.AsyncClient(verify=self._verify) + + @property + def is_session_closed(self) -> bool: + """Check if the httpx session is closed""" + return self._session.is_closed + + def log(self, level: int, msg: Any) -> None: + """Log a message with the given level on the client logger. + """ + if not self.disable_client_logging: + logger.log(level, msg) + + @_retry_requests # type: ignore + async def _get_request( + self, + endpoint, + additional_headers=None, + params=None + ) -> httpx.Response: + url = f"{self._firecrest_url}{endpoint}" + headers = { + "Authorization": f"Bearer {self._authorization.get_access_token()}" + } + if additional_headers: + headers.update(additional_headers) + + self.log(logging.DEBUG, f"Making GET request to {endpoint}") + with time_block(f"GET request to {endpoint}", logger): + resp = await self._session.get( + url=url, headers=headers, params=params, timeout=self.timeout + ) + + return resp + + @_retry_requests # type: ignore + async def _post_request( + self, endpoint, additional_headers=None, params=None, data=None, files=None + ) -> httpx.Response: + url = f"{self._firecrest_url}{endpoint}" + headers = { + "Authorization": f"Bearer {self._authorization.get_access_token()}" + } + if additional_headers: + headers.update(additional_headers) + + self.log(logging.DEBUG, f"Making POST request to {endpoint}") + with time_block(f"POST request to {endpoint}", logger): + resp = await self._session.post( + url=url, + headers=headers, + params=params, + data=data, + files=files, + timeout=self.timeout + ) + + return resp + + @_retry_requests # type: ignore + async def _put_request( + self, endpoint, additional_headers=None, data=None + ) -> httpx.Response: + url = f"{self._firecrest_url}{endpoint}" + headers = { + "Authorization": f"Bearer {self._authorization.get_access_token()}" + } + if additional_headers: + headers.update(additional_headers) + + self.log(logging.DEBUG, f"Making PUT request to {endpoint}") + with time_block(f"PUT request to {endpoint}", logger): + resp = await self._session.put( + url=url, headers=headers, data=data, timeout=self.timeout + ) + + return resp + + @_retry_requests # type: ignore + async def _delete_request( + self, endpoint, additional_headers=None, params=None, data=None + ) -> httpx.Response: + url = f"{self._firecrest_url}{endpoint}" + headers = { + "Authorization": f"Bearer {self._authorization.get_access_token()}" + } + if additional_headers: + headers.update(additional_headers) + + self.log(logging.INFO, f"Making DELETE request to {endpoint}") + with time_block(f"DELETE request to {endpoint}", logger): + # httpx doesn't support data in the `delete` method so we will + # have to use the generic `request` method + # https://www.python-httpx.org/compatibility/#request-body-on-http-methods + resp = await self._session.request( + method="DELETE", + url=url, + headers=headers, + params=params, + data=data, + timeout=self.timeout, + ) + + return resp + + def _check_response( + self, + response: httpx.Response, + expected_status_code: int, + return_json: bool = True + ) -> dict: + status_code = response.status_code + # handle_response(response) + if status_code != expected_status_code: + self.log( + logging.DEBUG, + f"Unexpected status of last request {status_code}, it " + f"should have been {expected_status_code}" + ) + raise UnexpectedStatusException( + [response], expected_status_code + ) + + return response.json() if return_json and status_code != 204 else {} + + async def systems(self) -> List[dict]: + """Returns available systems. + + :calls: GET `/status/systems` + """ + resp = await self._get_request(endpoint="/status/systems") + return self._check_response(resp, 200)['systems'] + + async def nodes( + self, + system_name: str + ) -> List[dict]: + """Returns nodes of the system. + + :param system_name: the system name where the nodes belong to + :calls: GET `/status/{system_name}/nodes` + """ + resp = await self._get_request( + endpoint=f"/status/{system_name}/nodes" + ) + return self._check_response(resp, 200)['nodes'] + + async def reservations( + self, + system_name: str + ) -> List[dict]: + """Returns reservations defined in the system. + + :param system_name: the system name where the reservations belong to + :calls: GET `/status/{system_name}/reservations` + """ + resp = await self._get_request( + endpoint=f"/status/{system_name}/reservations" + ) + return self._check_response(resp, 200)['reservations'] + + async def partitions( + self, + system_name: str + ) -> List[dict]: + """Returns partitions defined in the scheduler of the system. + + :param system_name: the system name where the partitions belong to + :calls: GET `/status/{system_name}/partitions` + """ + resp = await self._get_request( + endpoint=f"/status/{system_name}/partitions" + ) + return self._check_response(resp, 200)["partitions"] + + async def userinfo( + self, + system_name: str + ) -> dict: + """Returns user and groups information. + + :calls: GET `/status/{system_name}/userinfo` + """ + resp = await self._get_request( + endpoint=f"/status/{system_name}/userinfo" + ) + return self._check_response(resp, 200) + + async def list_files( + self, + system_name: str, + path: str, + show_hidden: bool = False, + recursive: bool = False, + numeric_uid: bool = False, + dereference: bool = False + ) -> List[dict]: + """Returns a list of files in a directory. + + :param system_name: the system name where the filesystem belongs to + :param path: the absolute target path + :param show_hidden: Show hidden files + :param recursive: recursively list directories encountered + :param dereference: when showing file information for a symbolic link, + show information for the file the link references + rather than for the link itself + :calls: GET `/filesystem/{system_name}/ops/ls` + """ + resp = await self._get_request( + endpoint=f"/filesystem/{system_name}/ops/ls", + params={ + "path": path, + "showHidden": show_hidden, + "recursive": recursive, + "numericUid": numeric_uid, + "dereference": dereference + } + ) + return self._check_response(resp, 200)["output"] + + async def head( + self, + system_name: str, + path: str, + num_bytes: Optional[int] = None, + num_lines: Optional[int] = None, + exclude_trailing: bool = False, + ) -> List[dict]: + """Display the beginning of a specified file. + By default 10 lines will be returned. + `num_bytes` and `num_lines` cannot be specified simultaneously. + + :param system_name: the system name where the filesystem belongs to + :param path: the absolute target path of the file + :param num_bytes: the output will be the first NUM bytes of each file + :param num_lines: the output will be the first NUM lines of each file + :param exclude_trailing: the output will be the whole file, without + the last NUM bytes/lines of each file. NUM + should be specified in the respective + argument through ``bytes`` or ``lines``. + :calls: GET `/filesystem/{system_name}/ops/head` + """ + # Validate that num_bytes and num_lines are not passed together + if num_bytes is not None and num_lines is not None: + raise ValueError( + "You cannot specify both `num_bytes` and `num_lines`." + ) + + # If `exclude_trailing` is passed, either `num_bytes` or `num_lines` + # must be passed + if exclude_trailing and num_bytes is None and num_lines is None: + raise ValueError( + "`exclude_trailing` requires either `num_bytes` or " + "`num_lines` to be specified.") + + params = { + "path": path, + "skipEnding": exclude_trailing + } + if num_bytes is not None: + params["bytes"] = num_bytes + + if num_lines is not None: + params["lines"] = num_lines + + resp = await self._get_request( + endpoint=f"/filesystem/{system_name}/ops/head", + params=params + ) + return self._check_response(resp, 200)['output'] + + async def tail( + self, + system_name: str, + path: str, + num_bytes: Optional[int] = None, + num_lines: Optional[int] = None, + exclude_beginning: bool = False, + ) -> List[dict]: + """Display the ending of a specified file. + By default, 10 lines will be returned. + `num_bytes` and `num_lines` cannot be specified simultaneously. + + :param system_name: the system name where the filesystem belongs to + :param path: the absolute target path of the file + :param num_bytes: The output will be the last NUM bytes of each file + :param num_lines: The output will be the last NUM lines of each file + :param exclude_beginning: The output will be the whole file, without + the first NUM bytes/lines of each file. NUM + should be specified in the respective + argument through ``num_bytes`` or + ``num_lines``. + :calls: GET `/filesystem/{system_name}/ops/tail` + """ + # Ensure `num_bytes` and `num_lines` are not passed together + if num_bytes is not None and num_lines is not None: + raise ValueError( + "You cannot specify both `num_bytes` and `num_lines`." + ) + + # If `exclude_beginning` is passed, either `num_bytes` or `num_lines` + # must be passed + if exclude_beginning and num_bytes is None and num_lines is None: + raise ValueError( + "`exclude_beginning` requires either `num_bytes` or " + "`num_lines` to be specified." + ) + + params = { + "path": path, + "skipBeginning": exclude_beginning + } + if num_bytes is not None: + params["bytes"] = num_bytes + + if num_lines is not None: + params["lines"] = num_lines + + resp = await self._get_request( + endpoint=f"/filesystem/{system_name}/ops/tail", + params=params + ) + return self._check_response(resp, 200)['output'] + + async def view( + self, + system_name: str, + path: str, + ) -> str: + """ + View full file content (up to 5MB files) + + :param system_name: the system name where the filesystem belongs to + :param path: the absolute target path of the file + :calls: GET `/filesystem/{system_name}/ops/view` + """ + resp = await self._get_request( + endpoint=f"/filesystem/{system_name}/ops/view", + params={"path": path} + ) + return self._check_response(resp, 200)["output"] + + async def checksum( + self, + system_name: str, + path: str, + ) -> dict: + """ + Calculate the SHA256 (256-bit) checksum of a specified file. + + :param system_name: the system name where the filesystem belongs to + :param path: the absolute target path of the file + :calls: GET `/filesystem/{system_name}/ops/checksum` + """ + resp = await self._get_request( + endpoint=f"/filesystem/{system_name}/ops/checksum", + params={"path": path} + ) + return self._check_response(resp, 200)["output"] + + async def file_type( + self, + system_name: str, + path: str, + ) -> str: + """ + Uses the `file` linux application to determine the type of a file. + + :param system_name: the system name where the filesystem belongs to + :param path: the absolute target path of the file + :calls: GET `/filesystem/{system_name}/ops/file` + """ + resp = await self._get_request( + endpoint=f"/filesystem/{system_name}/ops/file", + params={"path": path} + ) + return self._check_response(resp, 200)["output"] + + async def chmod( + self, + system_name: str, + path: str, + mode: str + ) -> dict: + """Changes the file mod bits of a given file according to the + specified mode. + + :param system_name: the system name where the filesystem belongs to + :param path: the absolute target path of the file + :param mode: same as numeric mode of linux chmod tool + :calls: PUT `/filesystem/{system_name}/ops/chmod` + """ + data: dict[str, str] = { + "path": path, + "mode": mode + } + resp = await self._put_request( + endpoint=f"/filesystem/{system_name}/ops/chmod", + data=json.dumps(data) + ) + return self._check_response(resp, 200)["output"] + + async def chown( + self, + system_name: str, + path: str, + owner: str, + group: str + ) -> dict: + """Changes the user and/or group ownership of a given file. + If only owner or group information is passed, only that information + will be updated. + + :param system_name: the system name where the filesystem belongs to + :param path: the absolute target path of the file + :param owner: owner ID for target + :param group: group ID for target + :calls: PUT `/filesystem/{system_name}/ops/chown` + """ + data: dict[str, str] = { + "path": path, + "owner": owner, + "group": group + } + resp = await self._put_request( + endpoint=f"/filesystem/{system_name}/ops/chown", + data=json.dumps(data) + ) + return self._check_response(resp, 200)["output"] + + async def stat( + self, + system_name: str, + path: str, + dereference: bool = False, + ) -> dict: + """ + Uses the stat linux application to determine the status of a file on + the system's filesystem. The result follows: + https://docs.python.org/3/library/os.html#os.stat_result. + + :param system_name: the system name where the filesystem belongs to + :param path: the absolute target path + :param dereference: follow symbolic links + :calls: GET `/filesystem/{system_name}/ops/stat` + """ + resp = await self._get_request( + endpoint=f"/filesystem/{system_name}/ops/stat", + params={ + "path": path, + "dereference": dereference + } + ) + return self._check_response(resp, 200)["output"] + + async def symlink( + self, + system_name: str, + source_path: str, + link_path: str, + ) -> dict: + """Create a symbolic link. + + :param system_name: the system name where the filesystem belongs to + :param source_path: the absolute path to the file the link points to + :param link_path: the absolute path to the symlink + + :calls: POST `/filesystem/{system_name}/ops/symlink` + """ + resp = await self._post_request( + endpoint=f"/filesystem/{system_name}/ops/symlink", + data=json.dumps({ + "sourcePath": source_path, + "linkPath": link_path + }) + ) + return self._check_response(resp, 201) + + async def mkdir( + self, + system_name: str, + path: str, + create_parents: bool = False + ) -> dict: + """Create a directory. + + :param system_name: the system name where the filesystem belongs to + :param path: the absolute path to the new directory + :param create_parents: create intermediate parent directories + + :calls: POST `/filesystem/{system_name}/ops/mkdir` + """ + resp = await self._post_request( + endpoint=f"/filesystem/{system_name}/ops/mkdir", + data=json.dumps({ + "sourcePath": path, + "parent": create_parents + }) + ) + return self._check_response(resp, 201) + + async def mv( + self, + system_name: str, + source_path: str, + target_path: str, + blocking: bool = False + ) -> dict: + """Rename/move a file, directory, or symlink at the `source_path` to + the `target_path` on `system_name`'s filesystem. + This operation runs in a job. + + :param system_name: the system name where the filesystem belongs to + :param source_path: the absolute source path + :param target_path: the absolute target path + :param blocking: whether to wait for the job to complete + + :calls: POST `/filesystem/{system_name}/transfer/mv` + """ + data: dict[str, str] = { + "sourcePath": source_path, + "targetPath": target_path + } + resp = await self._post_request( + endpoint=f"/filesystem/{system_name}/transfer/mv", + data=json.dumps(data) + ) + job_info = self._check_response(resp, 201) + + if blocking: + await self._wait_for_transfer_job(job_info) + + return job_info + + async def compress( + self, + system_name: str, + source_path: str, + target_path: str, + dereference: bool = False + ) -> None: + """Compress a directory or file. + + :param system_name: the system name where the filesystem belongs to + :param source_path: the absolute path to source directory + :param target_path: the absolute path to the newly created + compressed file + :param dereference: dereference links when compressing + :calls: POST `/filesystem/{system_name}/ops/compress` + """ + resp = await self._post_request( + endpoint=f"/filesystem/{system_name}/ops/compress", + data=json.dumps({ + "source_path": source_path, + "target_path": target_path, + "dereference": dereference + }) + ) + self._check_response(resp, 204) + + async def extract( + self, + system_name: str, + source_path: str, + target_path: str, + ) -> None: + """Extract tar gzip archives. + + :param system_name: the system name where the filesystem belongs to + :param source_path: the absolute path to the archive + :param target_path: the absolute path to target directory + :calls: POST `/filesystem/{system_name}/ops/extract` + """ + resp = await self._post_request( + endpoint=f"/filesystem/{system_name}/ops/extract", + data=json.dumps({ + "source_path": source_path, + "target_path": target_path + }) + ) + self._check_response(resp, 204) + + async def _wait_for_transfer_job(self, job_info): + job_id = job_info["transferJob"]["jobId"] + system_name = job_info["transferJob"]["system"] + for i in sleep_generator(): + try: + job = await self.job_info(system_name, job_id) + except FirecrestException as e: + if e.responses[-1].status_code == 404 and "Job not found" in e.responses[-1].json()['message']: + await asyncio.sleep(i) + continue + + state = job[0]["state"]["current"] + if isinstance(state, list): + state = ",".join(state) + + if slurm_state_completed(state): + break + + await asyncio.sleep(i) + + # TODO: Check if the job was successful + + stdout_file = await self.view(system_name, job_info["transferJob"]["logs"]["outputLog"]) + if ( + "Files were successfully" not in stdout_file and + "File was successfully" not in stdout_file and + "Multipart file upload successfully completed" not in stdout_file + ): + raise TransferJobFailedException(job_info) + + async def cp( + self, + system_name: str, + source_path: str, + target_path: str, + blocking: bool = False + ) -> dict: + """Copies file from `source_path` to `target_path`. + + :param system_name: the system name where the filesystem belongs to + :param source_path: the absolute source path + :param target_path: the absolute target path + :param blocking: whether to wait for the job to complete + :calls: POST `/filesystem/{system_name}/transfer/cp` + """ + data: dict[str, str] = { + "sourcePath": source_path, + "targetPath": target_path + } + + resp = await self._post_request( + endpoint=f"/filesystem/{system_name}/transfer/cp", + data=json.dumps(data) + ) + job_info = self._check_response(resp, 201) + + if blocking: + await self._wait_for_transfer_job(job_info) + + return job_info + + async def rm( + self, + system_name: str, + path: str, + blocking: bool = False + ) -> dict: + """Delete a file. + + :param system_name: the system name where the filesystem belongs to + :param path: the absolute target path + :calls: DELETE `/filesystem/{system_name}/transfer/rm` + """ + resp = await self._delete_request( + endpoint=f"/filesystem/{system_name}/transfer/rm", + params={"path": path} + ) + # self._check_response(resp, 204) + + job_info = self._check_response(resp, 200) + + if blocking: + await self._wait_for_transfer_job(job_info) + + return job_info + + async def upload( + self, + system_name: str, + local_file: str | pathlib.Path | BytesIO, + directory: str, + filename: str, + blocking: bool = False + ) -> dict: + """Upload a file to the system. The user uploads a file to the + staging area Object storage) of FirecREST and it will be moved + to the target directory in a job. + + :param system_name: the system name where the filesystem belongs to + :param local_file: the local file's path to be uploaded (can be + relative) + :param source_path: the absolut target path of the directory where the + file will be uploaded + :param filename: the name of the file in the target directory + :param blocking: whether to wait for the job to complete + :calls: POST `/filesystem/{system_name}/transfer/upload` + """ + # TODO check if the file exists locally + + resp = await self._post_request( + endpoint=f"/filesystem/{system_name}/transfer/upload", + data=json.dumps({ + "source_path": directory, + "fileName": filename + }) + ) + + transfer_info = self._check_response(resp, 201) + # Upload the file + async with aiofiles.open(local_file, "rb") as f: + data = await f.read() # TODO this will fail for large files + await self._session.put( + url=transfer_info["uploadUrl"], + data=data # type: ignore + ) + + if blocking: + await self._wait_for_transfer_job(transfer_info) + + return transfer_info + + async def download( + self, + system_name: str, + source_path: str, + target_path: str, + blocking: bool = False + ) -> dict: + """Download a file from the remote system. + + :param system_name: the system name where the filesystem belongs to + :param source_path: the absolute source path of the file + :param target_path: the target path in the local filesystem (can + be relative path) + :param blocking: whether to wait for the job to complete + :calls: POST `/filesystem/{system_name}/transfer/upload` + """ + resp = await self._post_request( + endpoint=f"/filesystem/{system_name}/transfer/download", + data=json.dumps({ + "source_path": source_path, + }) + ) + + transfer_info = self._check_response(resp, 201) + if blocking: + await self._wait_for_transfer_job(transfer_info) + + # Download the file + async with aiofiles.open(target_path, "wb") as f: + # TODO this will fail for large files + resp = await self._session.get( + url=transfer_info["downloadUrl"], + ) + await f.write(resp.content) + + return transfer_info + + async def submit( + self, + system_name: str, + working_dir: str, + script_str: Optional[str] = None, + script_local_path: Optional[str] = None, + env_vars: Optional[dict[str, str]] = None, + ) -> dict: + """Submit a job. + + :param system_name: the system name where the filesystem belongs to + :param working_dir: the working directory of the job + :param script_str: the job script + :param script_local_path: path to the job script + :param env_vars: environment variables to be set before running the + job + :calls: POST `/compute/{system_name}/jobs` + """ + + if sum( + arg is not None for arg in [script_str, script_local_path] + ) != 1: + raise ValueError( + "Exactly one of the arguments `script_str` or " + "`script_local_path` must be set." + ) + + if script_local_path: + if not os.path.isfile(script_local_path): + raise FileNotFoundError( + f"Script file not found: {script_local_path}" + ) + with open(script_local_path) as file: + script_str = file.read() + + data: dict[str, dict[str, Any]] = { + "job": { + "script": script_str, + "working_directory": working_dir + } + } + if env_vars: + data["job"]["env"] = env_vars + + resp = await self._post_request( + endpoint=f"/compute/{system_name}/jobs", + data=json.dumps(data) + ) + return self._check_response(resp, 201) + + async def job_info( + self, + system_name: str, + jobid: Optional[str] = None + ) -> dict: + """Get job information. When the job is not specified, it will return + all the jobs. + + :param system_name: the system name where the filesystem belongs to + :param job: the ID of the job + :calls: GET `/compute/{system_name}/jobs` or + GET `/compute/{system_name}/jobs/{job}` + """ + url = f"/compute/{system_name}/jobs" + url = f"{url}/{jobid}" if jobid else url + + resp = await self._get_request( + endpoint=url, + ) + return self._check_response(resp, 200)["jobs"] + + async def job_metadata( + self, + system_name: str, + jobid: str, + ) -> dict: + """Get job metadata. + + :param system_name: the system name where the filesystem belongs to + :param jobid: the ID of the job + :calls: GET `/compute/{system_name}/jobs/{jobid}/metadata` + """ + resp = await self._get_request( + endpoint=f"/compute/{system_name}/jobs/{jobid}/metadata", + ) + return self._check_response(resp, 200)['jobs'] + + async def cancel_job( + self, + system_name: str, + jobid: str, + ) -> dict: + """Cancel a job. + + :param system_name: the system name where the filesystem belongs to + :param jobid: the ID of the job to be cancelled + :calls: DELETE `/compute/{system_name}/jobs/{jobid}` + """ + resp = await self._delete_request( + endpoint=f"/compute/{system_name}/jobs/{jobid}", + ) + return self._check_response(resp, 204) + + async def attach_to_job( + self, + system_name: str, + jobid: str, + command: str, + ) -> dict: + """Attach a process to a job. + + :param system_name: the system name where the filesystem belongs to + :param jobid: the ID of the job + :param command: the command to be executed + :calls: PUT `/compute/{system_name}/jobs/{jobid}/attach` + """ + resp = await self._put_request( + endpoint=f"/compute/{system_name}/jobs/{jobid}/attach", + data=json.dumps({"command": command}) + ) + return self._check_response(resp, 204) diff --git a/firecrest/v2/_async/__init__.py b/firecrest/v2/_async/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/firecrest/v2/_sync/Client.py b/firecrest/v2/_sync/Client.py new file mode 100644 index 0000000..45d4883 --- /dev/null +++ b/firecrest/v2/_sync/Client.py @@ -0,0 +1,1028 @@ +# +# Copyright (c) 2024, ETH Zurich. All rights reserved. +# +# Please, refer to the LICENSE file in the root directory. +# SPDX-License-Identifier: BSD-3-Clause +# +from __future__ import annotations + +import httpx +import json +import logging +import os +import pathlib +import ssl +import time + +from io import BytesIO +from packaging.version import Version, parse +from typing import Any, Optional, List + +from firecrest.utilities import ( + parse_retry_after, slurm_state_completed, time_block +) +from firecrest.FirecrestException import ( + FirecrestException, + TransferJobFailedException, + UnexpectedStatusException +) + + +logger = logging.getLogger(__name__) + + +# This function is temporarily here +def handle_response(response): + print("\nResponse status code:") + print(response.status_code) + print("\nResponse headers:") + print(json.dumps(dict(response.headers), indent=4)) + print("\nResponse json:") + try: + print(json.dumps(response.json(), indent=4)) + except json.JSONDecodeError: + print("-") + + +def sleep_generator(): + yield 0.2 # First yield 2 seconds because the api takes time to update + value = 0.5 + while True: + yield value + value *= 2 # Double the value for each iteration + + +class Firecrest: + """ + This is the basic class you instantiate to access the FirecREST API v2. + Necessary parameters are the firecrest URL and an authorization object. + + :param firecrest_url: FirecREST's URL + :param authorization: the authorization object. This object is responsible + of handling the credentials and the only requirement + for it is that it has a method get_access_token() + that returns a valid access token. + :param verify: either a boolean, in which case it controls whether + requests will verify the server’s TLS certificate, + or a string, in which case it must be a path to a CA bundle + to use + """ + + TOO_MANY_REQUESTS_CODE = 429 + + def _retry_requests(func): + def wrapper(*args, **kwargs): + client = args[0] + num_retries = 0 + resp = func(*args, **kwargs) + while True: + if resp.status_code != client.TOO_MANY_REQUESTS_CODE: + break + elif ( + client.num_retries_rate_limit is not None + and num_retries >= client.num_retries_rate_limit + ): + client.log( + logging.DEBUG, + f"Rate limit is reached and the request has " + f"been retried already {num_retries} times" + ) + break + else: + reset = resp.headers.get( + "Retry-After", + default=resp.headers.get( + "RateLimit-Reset", default=10 + ), + ) + reset = parse_retry_after(reset, client.log) + client.log( + logging.INFO, + f"Rate limit is reached, will sleep for " + f"{reset} seconds and try again" + ) + time.sleep(reset) + resp = func(*args, **kwargs) + num_retries += 1 + + return resp + + return wrapper + + def __init__( + self, + firecrest_url: str, + authorization: Any, + verify: str | bool | ssl.SSLContext = True, + ) -> None: + self._firecrest_url = firecrest_url + self._authorization = authorization + self._verify = verify + #: This attribute will be passed to all the requests that will be made. + #: How many seconds to wait for the server to send data before giving + # up. After that time a `requests.exceptions.Timeout` error will be + # raised. + #: + #: It can be a float or a tuple. More details here: + # https://www.python-httpx.org/advanced/#fine-tuning-the-configuration. + self.timeout: Any = None + # type is Any because of some incompatibility between httpx and + # requests library + + #: Disable all logging from the client. + self.disable_client_logging: bool = False + #: Number of retries in case the rate limit is reached. When it is set + # to `None`, the client will keep trying until it gets a different + # status code than 429. + self.num_retries_rate_limit: Optional[int] = None + self._api_version: Version = parse("2.0.0") + self._session = httpx.Client(verify=self._verify) + + def set_api_version(self, api_version: str) -> None: + """Set the version of the api of firecrest. By default it will be + assumed that you are using version 2.0.0 or compatible. The version is + parsed by the `packaging` library. + """ + self._api_version = parse(api_version) + + def close_session(self) -> None: + """Close the httpx session""" + self._session.close() + + def create_new_session(self) -> None: + """Create a new httpx session""" + if not self._session.is_closed: + self._session.close() + + self._session = httpx.Client(verify=self._verify) + + @property + def is_session_closed(self) -> bool: + """Check if the httpx session is closed""" + return self._session.is_closed + + def log(self, level: int, msg: Any) -> None: + """Log a message with the given level on the client logger. + """ + if not self.disable_client_logging: + logger.log(level, msg) + + @_retry_requests # type: ignore + def _get_request( + self, + endpoint, + additional_headers=None, + params=None + ) -> httpx.Response: + url = f"{self._firecrest_url}{endpoint}" + headers = { + "Authorization": f"Bearer {self._authorization.get_access_token()}" + } + if additional_headers: + headers.update(additional_headers) + + self.log(logging.DEBUG, f"Making GET request to {endpoint}") + with time_block(f"GET request to {endpoint}", logger): + resp = self._session.get( + url=url, headers=headers, params=params, timeout=self.timeout + ) + + return resp + + @_retry_requests # type: ignore + def _post_request( + self, endpoint, additional_headers=None, params=None, data=None, files=None + ) -> httpx.Response: + url = f"{self._firecrest_url}{endpoint}" + headers = { + "Authorization": f"Bearer {self._authorization.get_access_token()}" + } + if additional_headers: + headers.update(additional_headers) + + self.log(logging.DEBUG, f"Making POST request to {endpoint}") + with time_block(f"POST request to {endpoint}", logger): + resp = self._session.post( + url=url, + headers=headers, + params=params, + data=data, + files=files, + timeout=self.timeout + ) + + return resp + + @_retry_requests # type: ignore + def _put_request( + self, endpoint, additional_headers=None, data=None + ) -> httpx.Response: + url = f"{self._firecrest_url}{endpoint}" + headers = { + "Authorization": f"Bearer {self._authorization.get_access_token()}" + } + if additional_headers: + headers.update(additional_headers) + + self.log(logging.DEBUG, f"Making PUT request to {endpoint}") + with time_block(f"PUT request to {endpoint}", logger): + resp = self._session.put( + url=url, headers=headers, data=data, timeout=self.timeout + ) + + return resp + + @_retry_requests # type: ignore + def _delete_request( + self, endpoint, additional_headers=None, params=None, data=None + ) -> httpx.Response: + url = f"{self._firecrest_url}{endpoint}" + headers = { + "Authorization": f"Bearer {self._authorization.get_access_token()}" + } + if additional_headers: + headers.update(additional_headers) + + self.log(logging.INFO, f"Making DELETE request to {endpoint}") + with time_block(f"DELETE request to {endpoint}", logger): + # httpx doesn't support data in the `delete` method so we will + # have to use the generic `request` method + # https://www.python-httpx.org/compatibility/#request-body-on-http-methods + resp = self._session.request( + method="DELETE", + url=url, + headers=headers, + params=params, + data=data, + timeout=self.timeout, + ) + + return resp + + def _check_response( + self, + response: httpx.Response, + expected_status_code: int, + return_json: bool = True + ) -> dict: + status_code = response.status_code + # handle_response(response) + if status_code != expected_status_code: + self.log( + logging.DEBUG, + f"Unexpected status of last request {status_code}, it " + f"should have been {expected_status_code}" + ) + raise UnexpectedStatusException( + [response], expected_status_code + ) + + return response.json() if return_json and status_code != 204 else {} + + def systems(self) -> List[dict]: + """Returns available systems. + + :calls: GET `/status/systems` + """ + resp = self._get_request(endpoint="/status/systems") + return self._check_response(resp, 200)['systems'] + + def nodes( + self, + system_name: str + ) -> List[dict]: + """Returns nodes of the system. + + :param system_name: the system name where the nodes belong to + :calls: GET `/status/{system_name}/nodes` + """ + resp = self._get_request( + endpoint=f"/status/{system_name}/nodes" + ) + return self._check_response(resp, 200)['nodes'] + + def reservations( + self, + system_name: str + ) -> List[dict]: + """Returns reservations defined in the system. + + :param system_name: the system name where the reservations belong to + :calls: GET `/status/{system_name}/reservations` + """ + resp = self._get_request( + endpoint=f"/status/{system_name}/reservations" + ) + return self._check_response(resp, 200)['reservations'] + + def partitions( + self, + system_name: str + ) -> List[dict]: + """Returns partitions defined in the scheduler of the system. + + :param system_name: the system name where the partitions belong to + :calls: GET `/status/{system_name}/partitions` + """ + resp = self._get_request( + endpoint=f"/status/{system_name}/partitions" + ) + return self._check_response(resp, 200)["partitions"] + + def userinfo( + self, + system_name: str + ) -> dict: + """Returns user and groups information. + + :calls: GET `/status/{system_name}/userinfo` + """ + resp = self._get_request( + endpoint=f"/status/{system_name}/userinfo" + ) + return self._check_response(resp, 200) + + def list_files( + self, + system_name: str, + path: str, + show_hidden: bool = False, + recursive: bool = False, + numeric_uid: bool = False, + dereference: bool = False + ) -> List[dict]: + """Returns a list of files in a directory. + + :param system_name: the system name where the filesystem belongs to + :param path: the absolute target path + :param show_hidden: Show hidden files + :param recursive: recursively list directories encountered + :param dereference: when showing file information for a symbolic link, + show information for the file the link references + rather than for the link itself + :calls: GET `/filesystem/{system_name}/ops/ls` + """ + resp = self._get_request( + endpoint=f"/filesystem/{system_name}/ops/ls", + params={ + "path": path, + "showHidden": show_hidden, + "recursive": recursive, + "numericUid": numeric_uid, + "dereference": dereference + } + ) + return self._check_response(resp, 200)["output"] + + def head( + self, + system_name: str, + path: str, + num_bytes: Optional[int] = None, + num_lines: Optional[int] = None, + exclude_trailing: bool = False, + ) -> List[dict]: + """Display the beginning of a specified file. + By default 10 lines will be returned. + `num_bytes` and `num_lines` cannot be specified simultaneously. + + :param system_name: the system name where the filesystem belongs to + :param path: the absolute target path of the file + :param num_bytes: the output will be the first NUM bytes of each file + :param num_lines: the output will be the first NUM lines of each file + :param exclude_trailing: the output will be the whole file, without + the last NUM bytes/lines of each file. NUM + should be specified in the respective + argument through ``bytes`` or ``lines``. + :calls: GET `/filesystem/{system_name}/ops/head` + """ + # Validate that num_bytes and num_lines are not passed together + if num_bytes is not None and num_lines is not None: + raise ValueError( + "You cannot specify both `num_bytes` and `num_lines`." + ) + + # If `exclude_trailing` is passed, either `num_bytes` or `num_lines` + # must be passed + if exclude_trailing and num_bytes is None and num_lines is None: + raise ValueError( + "`exclude_trailing` requires either `num_bytes` or " + "`num_lines` to be specified.") + + params = { + "path": path, + "skipEnding": exclude_trailing + } + if num_bytes is not None: + params["bytes"] = num_bytes + + if num_lines is not None: + params["lines"] = num_lines + + resp = self._get_request( + endpoint=f"/filesystem/{system_name}/ops/head", + params=params + ) + return self._check_response(resp, 200)['output'] + + def tail( + self, + system_name: str, + path: str, + num_bytes: Optional[int] = None, + num_lines: Optional[int] = None, + exclude_beginning: bool = False, + ) -> List[dict]: + """Display the ending of a specified file. + By default, 10 lines will be returned. + `num_bytes` and `num_lines` cannot be specified simultaneously. + + :param system_name: the system name where the filesystem belongs to + :param path: the absolute target path of the file + :param num_bytes: The output will be the last NUM bytes of each file + :param num_lines: The output will be the last NUM lines of each file + :param exclude_beginning: The output will be the whole file, without + the first NUM bytes/lines of each file. NUM + should be specified in the respective + argument through ``num_bytes`` or + ``num_lines``. + :calls: GET `/filesystem/{system_name}/ops/tail` + """ + # Ensure `num_bytes` and `num_lines` are not passed together + if num_bytes is not None and num_lines is not None: + raise ValueError( + "You cannot specify both `num_bytes` and `num_lines`." + ) + + # If `exclude_beginning` is passed, either `num_bytes` or `num_lines` + # must be passed + if exclude_beginning and num_bytes is None and num_lines is None: + raise ValueError( + "`exclude_beginning` requires either `num_bytes` or " + "`num_lines` to be specified." + ) + + params = { + "path": path, + "skipBeginning": exclude_beginning + } + if num_bytes is not None: + params["bytes"] = num_bytes + + if num_lines is not None: + params["lines"] = num_lines + + resp = self._get_request( + endpoint=f"/filesystem/{system_name}/ops/tail", + params=params + ) + return self._check_response(resp, 200)['output'] + + def view( + self, + system_name: str, + path: str, + ) -> str: + """ + View full file content (up to 5MB files) + + :param system_name: the system name where the filesystem belongs to + :param path: the absolute target path of the file + :calls: GET `/filesystem/{system_name}/ops/view` + """ + resp = self._get_request( + endpoint=f"/filesystem/{system_name}/ops/view", + params={"path": path} + ) + return self._check_response(resp, 200)["output"] + + def checksum( + self, + system_name: str, + path: str, + ) -> dict: + """ + Calculate the SHA256 (256-bit) checksum of a specified file. + + :param system_name: the system name where the filesystem belongs to + :param path: the absolute target path of the file + :calls: GET `/filesystem/{system_name}/ops/checksum` + """ + resp = self._get_request( + endpoint=f"/filesystem/{system_name}/ops/checksum", + params={"path": path} + ) + return self._check_response(resp, 200)["output"] + + def file_type( + self, + system_name: str, + path: str, + ) -> str: + """ + Uses the `file` linux application to determine the type of a file. + + :param system_name: the system name where the filesystem belongs to + :param path: the absolute target path of the file + :calls: GET `/filesystem/{system_name}/ops/file` + """ + resp = self._get_request( + endpoint=f"/filesystem/{system_name}/ops/file", + params={"path": path} + ) + return self._check_response(resp, 200)["output"] + + def chmod( + self, + system_name: str, + path: str, + mode: str + ) -> dict: + """Changes the file mod bits of a given file according to the + specified mode. + + :param system_name: the system name where the filesystem belongs to + :param path: the absolute target path of the file + :param mode: same as numeric mode of linux chmod tool + :calls: PUT `/filesystem/{system_name}/ops/chmod` + """ + data: dict[str, str] = { + "path": path, + "mode": mode + } + resp = self._put_request( + endpoint=f"/filesystem/{system_name}/ops/chmod", + data=json.dumps(data) + ) + return self._check_response(resp, 200)["output"] + + def chown( + self, + system_name: str, + path: str, + owner: str, + group: str + ) -> dict: + """Changes the user and/or group ownership of a given file. + If only owner or group information is passed, only that information + will be updated. + + :param system_name: the system name where the filesystem belongs to + :param path: the absolute target path of the file + :param owner: owner ID for target + :param group: group ID for target + :calls: PUT `/filesystem/{system_name}/ops/chown` + """ + data: dict[str, str] = { + "path": path, + "owner": owner, + "group": group + } + resp = self._put_request( + endpoint=f"/filesystem/{system_name}/ops/chown", + data=json.dumps(data) + ) + return self._check_response(resp, 200)["output"] + + def stat( + self, + system_name: str, + path: str, + dereference: bool = False, + ) -> dict: + """ + Uses the stat linux application to determine the status of a file on + the system's filesystem. The result follows: + https://docs.python.org/3/library/os.html#os.stat_result. + + :param system_name: the system name where the filesystem belongs to + :param path: the absolute target path + :param dereference: follow symbolic links + :calls: GET `/filesystem/{system_name}/ops/stat` + """ + resp = self._get_request( + endpoint=f"/filesystem/{system_name}/ops/stat", + params={ + "path": path, + "dereference": dereference + } + ) + return self._check_response(resp, 200)["output"] + + def symlink( + self, + system_name: str, + source_path: str, + link_path: str, + ) -> dict: + """Create a symbolic link. + + :param system_name: the system name where the filesystem belongs to + :param source_path: the absolute path to the file the link points to + :param link_path: the absolute path to the symlink + + :calls: POST `/filesystem/{system_name}/ops/symlink` + """ + resp = self._post_request( + endpoint=f"/filesystem/{system_name}/ops/symlink", + data=json.dumps({ + "sourcePath": source_path, + "linkPath": link_path + }) + ) + return self._check_response(resp, 201) + + def mkdir( + self, + system_name: str, + path: str, + create_parents: bool = False + ) -> dict: + """Create a directory. + + :param system_name: the system name where the filesystem belongs to + :param path: the absolute path to the new directory + :param create_parents: create intermediate parent directories + + :calls: POST `/filesystem/{system_name}/ops/mkdir` + """ + resp = self._post_request( + endpoint=f"/filesystem/{system_name}/ops/mkdir", + data=json.dumps({ + "sourcePath": path, + "parent": create_parents + }) + ) + return self._check_response(resp, 201) + + def mv( + self, + system_name: str, + source_path: str, + target_path: str, + blocking: bool = False + ) -> dict: + """Rename/move a file, directory, or symlink at the `source_path` to + the `target_path` on `system_name`'s filesystem. + This operation runs in a job. + + :param system_name: the system name where the filesystem belongs to + :param source_path: the absolute source path + :param target_path: the absolute target path + :param blocking: whether to wait for the job to complete + + :calls: POST `/filesystem/{system_name}/transfer/mv` + """ + data: dict[str, str] = { + "sourcePath": source_path, + "targetPath": target_path + } + resp = self._post_request( + endpoint=f"/filesystem/{system_name}/transfer/mv", + data=json.dumps(data) + ) + job_info = self._check_response(resp, 201) + + if blocking: + self._wait_for_transfer_job(job_info) + + return job_info + + def compress( + self, + system_name: str, + source_path: str, + target_path: str, + dereference: bool = False + ) -> None: + """Compress a directory or file. + + :param system_name: the system name where the filesystem belongs to + :param source_path: the absolute path to source directory + :param target_path: the absolute path to the newly created + compressed file + :param dereference: dereference links when compressing + :calls: POST `/filesystem/{system_name}/ops/compress` + """ + resp = self._post_request( + endpoint=f"/filesystem/{system_name}/ops/compress", + data=json.dumps({ + "source_path": source_path, + "target_path": target_path, + "dereference": dereference + }) + ) + self._check_response(resp, 204) + + def extract( + self, + system_name: str, + source_path: str, + target_path: str, + ) -> None: + """Extract tar gzip archives. + + :param system_name: the system name where the filesystem belongs to + :param source_path: the absolute path to the archive + :param target_path: the absolute path to target directory + :calls: POST `/filesystem/{system_name}/ops/extract` + """ + resp = self._post_request( + endpoint=f"/filesystem/{system_name}/ops/extract", + data=json.dumps({ + "source_path": source_path, + "target_path": target_path + }) + ) + self._check_response(resp, 204) + + def _wait_for_transfer_job(self, job_info): + job_id = job_info["transferJob"]["jobId"] + system_name = job_info["transferJob"]["system"] + for i in sleep_generator(): + try: + job = self.job_info(system_name, job_id) + except FirecrestException as e: + if e.responses[-1].status_code == 404 and "Job not found" in e.responses[-1].json()['message']: + time.sleep(i) + continue + + state = job[0]["state"]["current"] + if isinstance(state, list): + state = ",".join(state) + + if slurm_state_completed(state): + break + + time.sleep(i) + + # TODO: Check if the job was successful + + stdout_file = self.view(system_name, job_info["transferJob"]["logs"]["outputLog"]) + if ( + "Files were successfully" not in stdout_file and + "File was successfully" not in stdout_file and + "Multipart file upload successfully completed" not in stdout_file + ): + raise TransferJobFailedException(job_info) + + def cp( + self, + system_name: str, + source_path: str, + target_path: str, + blocking: bool = False + ) -> dict: + """Copies file from `source_path` to `target_path`. + + :param system_name: the system name where the filesystem belongs to + :param source_path: the absolute source path + :param target_path: the absolute target path + :param blocking: whether to wait for the job to complete + :calls: POST `/filesystem/{system_name}/transfer/cp` + """ + data: dict[str, str] = { + "sourcePath": source_path, + "targetPath": target_path + } + + resp = self._post_request( + endpoint=f"/filesystem/{system_name}/transfer/cp", + data=json.dumps(data) + ) + job_info = self._check_response(resp, 201) + + if blocking: + self._wait_for_transfer_job(job_info) + + return job_info + + def rm( + self, + system_name: str, + path: str, + blocking: bool = False + ) -> dict: + """Delete a file. + + :param system_name: the system name where the filesystem belongs to + :param path: the absolute target path + :calls: DELETE `/filesystem/{system_name}/transfer/rm` + """ + resp = self._delete_request( + endpoint=f"/filesystem/{system_name}/transfer/rm", + params={"path": path} + ) + # self._check_response(resp, 204) + + job_info = self._check_response(resp, 200) + + if blocking: + self._wait_for_transfer_job(job_info) + + return job_info + + def upload( + self, + system_name: str, + local_file: str | pathlib.Path | BytesIO, + directory: str, + filename: str, + blocking: bool = False + ) -> dict: + """Upload a file to the system. The user uploads a file to the + staging area Object storage) of FirecREST and it will be moved + to the target directory in a job. + + :param system_name: the system name where the filesystem belongs to + :param local_file: the local file's path to be uploaded (can be + relative) + :param source_path: the absolut target path of the directory where the + file will be uploaded + :param filename: the name of the file in the target directory + :param blocking: whether to wait for the job to complete + :calls: POST `/filesystem/{system_name}/transfer/upload` + """ + # TODO check if the file exists locally + + resp = self._post_request( + endpoint=f"/filesystem/{system_name}/transfer/upload", + data=json.dumps({ + "source_path": directory, + "fileName": filename + }) + ) + + transfer_info = self._check_response(resp, 201) + # Upload the file + # FIXME + with open(local_file, "rb") as f: # type: ignore + data = f.read() # TODO this will fail for large files + self._session.put( + url=transfer_info["uploadUrl"], + data=data # type: ignore + ) + + if blocking: + self._wait_for_transfer_job(transfer_info) + + return transfer_info + + def download( + self, + system_name: str, + source_path: str, + target_path: str, + blocking: bool = False + ) -> dict: + """Download a file from the remote system. + + :param system_name: the system name where the filesystem belongs to + :param source_path: the absolute source path of the file + :param target_path: the target path in the local filesystem (can + be relative path) + :param blocking: whether to wait for the job to complete + :calls: POST `/filesystem/{system_name}/transfer/upload` + """ + resp = self._post_request( + endpoint=f"/filesystem/{system_name}/transfer/download", + data=json.dumps({ + "source_path": source_path, + }) + ) + + transfer_info = self._check_response(resp, 201) + if blocking: + self._wait_for_transfer_job(transfer_info) + + # Download the file + with open(target_path, "wb") as f: + # TODO this will fail for large files + resp = self._session.get( + url=transfer_info["downloadUrl"], + ) + f.write(resp.content) + + return transfer_info + + def submit( + self, + system_name: str, + working_dir: str, + script_str: Optional[str] = None, + script_local_path: Optional[str] = None, + env_vars: Optional[dict[str, str]] = None, + ) -> dict: + """Submit a job. + + :param system_name: the system name where the filesystem belongs to + :param working_dir: the working directory of the job + :param script_str: the job script + :param script_local_path: path to the job script + :param env_vars: environment variables to be set before running the + job + :calls: POST `/compute/{system_name}/jobs` + """ + + if sum( + arg is not None for arg in [script_str, script_local_path] + ) != 1: + raise ValueError( + "Exactly one of the arguments `script_str` or " + "`script_local_path` must be set." + ) + + if script_local_path: + if not os.path.isfile(script_local_path): + raise FileNotFoundError( + f"Script file not found: {script_local_path}" + ) + with open(script_local_path) as file: + script_str = file.read() + + data: dict[str, dict[str, Any]] = { + "job": { + "script": script_str, + "working_directory": working_dir + } + } + if env_vars: + data["job"]["env"] = env_vars + + resp = self._post_request( + endpoint=f"/compute/{system_name}/jobs", + data=json.dumps(data) + ) + return self._check_response(resp, 201) + + def job_info( + self, + system_name: str, + jobid: Optional[str] = None + ) -> dict: + """Get job information. When the job is not specified, it will return + all the jobs. + + :param system_name: the system name where the filesystem belongs to + :param job: the ID of the job + :calls: GET `/compute/{system_name}/jobs` or + GET `/compute/{system_name}/jobs/{job}` + """ + url = f"/compute/{system_name}/jobs" + url = f"{url}/{jobid}" if jobid else url + + resp = self._get_request( + endpoint=url, + ) + return self._check_response(resp, 200)["jobs"] + + def job_metadata( + self, + system_name: str, + jobid: str, + ) -> dict: + """Get job metadata. + + :param system_name: the system name where the filesystem belongs to + :param jobid: the ID of the job + :calls: GET `/compute/{system_name}/jobs/{jobid}/metadata` + """ + resp = self._get_request( + endpoint=f"/compute/{system_name}/jobs/{jobid}/metadata", + ) + return self._check_response(resp, 200)['jobs'] + + def cancel_job( + self, + system_name: str, + jobid: str, + ) -> dict: + """Cancel a job. + + :param system_name: the system name where the filesystem belongs to + :param jobid: the ID of the job to be cancelled + :calls: DELETE `/compute/{system_name}/jobs/{jobid}` + """ + resp = self._delete_request( + endpoint=f"/compute/{system_name}/jobs/{jobid}", + ) + return self._check_response(resp, 204) + + def attach_to_job( + self, + system_name: str, + jobid: str, + command: str, + ) -> dict: + """Attach a process to a job. + + :param system_name: the system name where the filesystem belongs to + :param jobid: the ID of the job + :param command: the command to be executed + :calls: PUT `/compute/{system_name}/jobs/{jobid}/attach` + """ + resp = self._put_request( + endpoint=f"/compute/{system_name}/jobs/{jobid}/attach", + data=json.dumps({"command": command}) + ) + return self._check_response(resp, 204) diff --git a/pyproject.toml b/pyproject.toml index 928c4d4..3426a70 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ classifiers = [ ] requires-python = ">=3.7" dependencies = [ + "aiofiles~=23.2.1", "requests>=2.14.0", "PyJWT>=2.4.0", "typer[all]~=0.7.0", @@ -46,6 +47,7 @@ test = [ "pytest>=5.3", "flake8~=5.0", "mypy~=0.991", + "types-aiofiles~=23.2.0.0", "types-requests~=2.28.11", "pytest-httpserver~=1.0.6", "pytest-asyncio>=0.21.1", @@ -58,6 +60,9 @@ docs = [ "sphinx-autobuild>=2021.0", "sphinx-click==3.0.2" ] +dev = [ + "unasync" +] [tool.mypy] show_error_codes = true diff --git a/tests/test_compute.py b/tests/test_compute.py index da8492f..916286f 100644 --- a/tests/test_compute.py +++ b/tests/test_compute.py @@ -20,7 +20,7 @@ class ValidAuthorization: def get_access_token(self): return "VALID_TOKEN" - client = firecrest.Firecrest( + client = firecrest.v1.Firecrest( firecrest_url=fc_server.url_for("/"), authorization=ValidAuthorization() ) client.polling_sleep_times = [0, 0, 0] @@ -45,7 +45,7 @@ class InvalidAuthorization: def get_access_token(self): return "INVALID_TOKEN" - client = firecrest.Firecrest( + client = firecrest.v1.Firecrest( firecrest_url=fc_server.url_for("/"), authorization=InvalidAuthorization() ) client.set_api_version("1.16.0") diff --git a/tests/test_compute_async.py b/tests/test_compute_async.py index d689517..b468b85 100644 --- a/tests/test_compute_async.py +++ b/tests/test_compute_async.py @@ -12,7 +12,7 @@ class ValidAuthorization: def get_access_token(self): return "VALID_TOKEN" - client = firecrest.AsyncFirecrest( + client = firecrest.v1.AsyncFirecrest( firecrest_url=fc_server.url_for("/"), authorization=ValidAuthorization() ) client.time_between_calls = { @@ -34,7 +34,7 @@ class InvalidAuthorization: def get_access_token(self): return "INVALID_TOKEN" - client = firecrest.AsyncFirecrest( + client = firecrest.v1.AsyncFirecrest( firecrest_url=fc_server.url_for("/"), authorization=InvalidAuthorization() ) client.time_between_calls = { diff --git a/tests/test_extras.py b/tests/test_extras.py index cf1241b..7d9eb51 100644 --- a/tests/test_extras.py +++ b/tests/test_extras.py @@ -44,7 +44,7 @@ def get_access_token(self): # } return "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOlsiZmlyZWNyZXN0LXNhIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsiYm9iLWNsaWVudCI6eyJyb2xlcyI6WyJib2IiXX19LCJjbGllbnRJZCI6ImJvYi1jbGllbnQiLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJzZXJ2aWNlLWFjY291bnQtYm9iLWNsaWVudCJ9.XfCXDclEBh7faQrOF2piYdnb7c3AUiCxDesTkNSwpSY" - return firecrest.Firecrest( + return firecrest.v1.Firecrest( firecrest_url=fc_server.url_for("/"), authorization=ValidAuthorization() ) @@ -64,7 +64,7 @@ def get_access_token(self): # } return "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOlsib3RoZXItcm9sZSJdfSwicHJlZmVycmVkX3VzZXJuYW1lIjoiYWxpY2UifQ.dpo1_F9jkV-RpNGqTaCNLbM-JPMnstDg7mQjzbwDp5g" - return firecrest.Firecrest( + return firecrest.v1.Firecrest( firecrest_url=fc_server.url_for("/"), authorization=ValidAuthorization() ) @@ -79,7 +79,7 @@ def get_access_token(self): # } return "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwcmVmZXJyZWRfdXNlcm5hbWUiOiJldmUifQ.SGVPDrJdy8b5jRpxcw9ILLsf8M2ljAYWxiN0A1b_1SE" - return firecrest.Firecrest( + return firecrest.v1.Firecrest( firecrest_url=fc_server.url_for("/"), authorization=ValidAuthorization() ) @@ -90,7 +90,7 @@ class ValidAuthorization: def get_access_token(self): return "VALID_TOKEN" - client = firecrest.Firecrest( + client = firecrest.v1.Firecrest( firecrest_url=fc_server.url_for("/"), authorization=ValidAuthorization() ) client.set_api_version("1.16.0") @@ -114,7 +114,7 @@ class InvalidAuthorization: def get_access_token(self): return "INVALID_TOKEN" - client = firecrest.Firecrest( + client = firecrest.v1.Firecrest( firecrest_url=fc_server.url_for("/"), authorization=InvalidAuthorization() ) client.set_api_version("1.16.0") diff --git a/tests/test_extras_async.py b/tests/test_extras_async.py index a8fb43b..534049a 100644 --- a/tests/test_extras_async.py +++ b/tests/test_extras_async.py @@ -13,7 +13,7 @@ class ValidAuthorization: def get_access_token(self): return "VALID_TOKEN" - client = firecrest.AsyncFirecrest( + client = firecrest.v1.AsyncFirecrest( firecrest_url=fc_server.url_for("/"), authorization=ValidAuthorization() ) client.time_between_calls = { @@ -35,7 +35,7 @@ class InvalidAuthorization: def get_access_token(self): return "INVALID_TOKEN" - client = firecrest.AsyncFirecrest( + client = firecrest.v1.AsyncFirecrest( firecrest_url=fc_server.url_for("/"), authorization=InvalidAuthorization() ) client.time_between_calls = { diff --git a/tests/test_status.py b/tests/test_status.py index ccb6cac..08187aa 100644 --- a/tests/test_status.py +++ b/tests/test_status.py @@ -21,7 +21,7 @@ class ValidAuthorization: def get_access_token(self): return "VALID_TOKEN" - client = firecrest.Firecrest( + client = firecrest.v1.Firecrest( firecrest_url=fc_server.url_for("/"), authorization=ValidAuthorization() ) client.set_api_version("1.16.0") @@ -45,7 +45,7 @@ class InvalidAuthorization: def get_access_token(self): return "INVALID_TOKEN" - client = firecrest.Firecrest( + client = firecrest.v1.Firecrest( firecrest_url=fc_server.url_for("/"), authorization=InvalidAuthorization() ) client.set_api_version("1.16.0") diff --git a/tests/test_status_async.py b/tests/test_status_async.py index 626e627..35d6c0f 100644 --- a/tests/test_status_async.py +++ b/tests/test_status_async.py @@ -12,7 +12,7 @@ class ValidAuthorization: def get_access_token(self): return "VALID_TOKEN" - client = firecrest.AsyncFirecrest( + client = firecrest.v1.AsyncFirecrest( firecrest_url=fc_server.url_for("/"), authorization=ValidAuthorization() ) client.time_between_calls = { @@ -34,7 +34,7 @@ class InvalidAuthorization: def get_access_token(self): return "INVALID_TOKEN" - client = firecrest.AsyncFirecrest( + client = firecrest.v1.AsyncFirecrest( firecrest_url=fc_server.url_for("/"), authorization=InvalidAuthorization() ) client.time_between_calls = { diff --git a/tests/test_storage.py b/tests/test_storage.py index 1e9e87a..41ac6a5 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -21,7 +21,7 @@ class ValidAuthorization: def get_access_token(self): return "VALID_TOKEN" - client = firecrest.Firecrest( + client = firecrest.v1.Firecrest( firecrest_url=fc_server.url_for("/"), authorization=ValidAuthorization() ) client.set_api_version("1.16.0") @@ -45,7 +45,7 @@ class InvalidAuthorization: def get_access_token(self): return "INVALID_TOKEN" - client = firecrest.Firecrest( + client = firecrest.v1.Firecrest( firecrest_url=fc_server.url_for("/"), authorization=InvalidAuthorization() ) client.set_api_version("1.16.0") @@ -651,7 +651,7 @@ def test_external_download(valid_client): external_download_retry = 0 valid_client.set_api_version("1.14.0") obj = valid_client.external_download("cluster1", "/path/to/remote/source") - assert isinstance(obj, firecrest.ExternalDownload) + assert isinstance(obj, firecrest.v1.ExternalDownload) assert obj._task_id == "external_download_id" assert obj.client == valid_client @@ -661,7 +661,7 @@ def test_external_download_legacy(valid_client): external_download_retry = 0 valid_client.set_api_version("1.13.0") obj = valid_client.external_download("cluster1", "/path/to/remote/sourcelegacy") - assert isinstance(obj, firecrest.ExternalDownload) + assert isinstance(obj, firecrest.v1.ExternalDownload) assert obj._task_id == "external_download_id_legacy" assert obj.client == valid_client @@ -716,7 +716,7 @@ def test_external_upload(valid_client): obj = valid_client.external_upload( "cluster1", "/path/to/local/source", "/path/to/remote/destination" ) - assert isinstance(obj, firecrest.ExternalUpload) + assert isinstance(obj, firecrest.v1.ExternalUpload) assert obj._task_id == "external_upload_id" assert obj.client == valid_client diff --git a/tests/test_storage_async.py b/tests/test_storage_async.py index f5ea5ae..c2e99b2 100644 --- a/tests/test_storage_async.py +++ b/tests/test_storage_async.py @@ -13,7 +13,7 @@ class ValidAuthorization: def get_access_token(self): return "VALID_TOKEN" - client = firecrest.AsyncFirecrest( + client = firecrest.v1.AsyncFirecrest( firecrest_url=fc_server.url_for("/"), authorization=ValidAuthorization() ) client.time_between_calls = { @@ -35,7 +35,7 @@ class InvalidAuthorization: def get_access_token(self): return "INVALID_TOKEN" - client = firecrest.AsyncFirecrest( + client = firecrest.v1.AsyncFirecrest( firecrest_url=fc_server.url_for("/"), authorization=InvalidAuthorization() ) client.time_between_calls = { @@ -266,7 +266,7 @@ async def test_external_download(valid_client): external_download_retry = 0 valid_client.set_api_version("1.14.0") obj = await valid_client.external_download("cluster1", "/path/to/remote/source") - assert isinstance(obj, firecrest.AsyncExternalDownload) + assert isinstance(obj, firecrest.v1.AsyncExternalDownload) assert obj._task_id == "external_download_id" assert obj.client == valid_client @@ -277,7 +277,7 @@ async def test_external_download_legacy(valid_client): external_download_retry = 0 valid_client.set_api_version("1.13.0") obj = await valid_client.external_download("cluster1", "/path/to/remote/sourcelegacy") - assert isinstance(obj, firecrest.AsyncExternalDownload) + assert isinstance(obj, firecrest.v1.AsyncExternalDownload) assert obj._task_id == "external_download_id_legacy" assert obj.client == valid_client @@ -289,6 +289,6 @@ async def test_external_upload(valid_client): obj = await valid_client.external_upload( "cluster1", "/path/to/local/source", "/path/to/remote/destination" ) - assert isinstance(obj, firecrest.AsyncExternalUpload) + assert isinstance(obj, firecrest.v1.AsyncExternalUpload) assert obj._task_id == "external_upload_id" assert obj.client == valid_client diff --git a/tests/test_utilities.py b/tests/test_utilities.py index 6eb8019..5f637a5 100644 --- a/tests/test_utilities.py +++ b/tests/test_utilities.py @@ -20,7 +20,7 @@ class ValidAuthorization: def get_access_token(self): return "VALID_TOKEN" - client = firecrest.Firecrest( + client = firecrest.v1.Firecrest( firecrest_url=fc_server.url_for("/"), authorization=ValidAuthorization() ) client.set_api_version("1.16.0") @@ -44,7 +44,7 @@ class InvalidAuthorization: def get_access_token(self): return "INVALID_TOKEN" - client = firecrest.Firecrest( + client = firecrest.v1.Firecrest( firecrest_url=fc_server.url_for("/"), authorization=InvalidAuthorization() ) client.set_api_version("1.16.0") diff --git a/tests/test_utilities_async.py b/tests/test_utilities_async.py index c08e4a7..ee8a252 100644 --- a/tests/test_utilities_async.py +++ b/tests/test_utilities_async.py @@ -12,7 +12,7 @@ class ValidAuthorization: def get_access_token(self): return "VALID_TOKEN" - client = firecrest.AsyncFirecrest( + client = firecrest.v1.AsyncFirecrest( firecrest_url=fc_server.url_for("/"), authorization=ValidAuthorization() ) client.time_between_calls = { @@ -34,7 +34,7 @@ class InvalidAuthorization: def get_access_token(self): return "INVALID_TOKEN" - client = firecrest.AsyncFirecrest( + client = firecrest.v1.AsyncFirecrest( firecrest_url=fc_server.url_for("/"), authorization=InvalidAuthorization() ) client.time_between_calls = { diff --git a/tests/v2/context_v2.py b/tests/v2/context_v2.py new file mode 100644 index 0000000..228b39c --- /dev/null +++ b/tests/v2/context_v2.py @@ -0,0 +1,6 @@ +import os +import sys + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))) + +from firecrest.v2 import AsyncFirecrest, Firecrest diff --git a/tests/v2/responses/checksum.json b/tests/v2/responses/checksum.json new file mode 100644 index 0000000..03b94a0 --- /dev/null +++ b/tests/v2/responses/checksum.json @@ -0,0 +1,9 @@ +{ + "status_code": 200, + "response": { + "output": { + "algorithm": "SHA256", + "checksum": "67149111d45cf106eb92ab5be7ec08179bddea7426ddde7cfe0ae68a7cffce74" + } + } +} diff --git a/tests/v2/responses/chmod.json b/tests/v2/responses/chmod.json new file mode 100644 index 0000000..569f4d3 --- /dev/null +++ b/tests/v2/responses/chmod.json @@ -0,0 +1,14 @@ +{ + "status_code": 200, + "response": { + "output": { + "name": "/home/test1/xxx", + "type": "-", + "linkTarget": null, + "user": "test1", + "group": "users", + "permissions": "rwxrwxrwx", + "lastModified": "2024-10-24T15:00:01", + "size": "0" + } +} diff --git a/tests/v2/responses/chown.json b/tests/v2/responses/chown.json new file mode 100644 index 0000000..9ee0eb6 --- /dev/null +++ b/tests/v2/responses/chown.json @@ -0,0 +1,14 @@ +{ + "status_code": 200, + "response": { + "output": { + "name": "/home/test1/xxx", + "type": "-", + "linkTarget": null, + "user": "test1", + "group": "users", + "permissions": "rwxrwxrwx", + "lastModified": "2024-10-24T15:00:01", + "size": "0" + } +} diff --git a/tests/v2/responses/chown_not_permitted.json b/tests/v2/responses/chown_not_permitted.json new file mode 100644 index 0000000..adf8130 --- /dev/null +++ b/tests/v2/responses/chown_not_permitted.json @@ -0,0 +1,10 @@ +{ + "status_code": 403, + "response": { + "errorType": "error", + "message": "chown: changing ownership of '/home/test1/xxx': Operation not permitted", + "data": null, + "user": "test1", + "authHeader": "Bearer" + } +} diff --git a/tests/v2/responses/compress.json b/tests/v2/responses/compress.json new file mode 100644 index 0000000..4513b62 --- /dev/null +++ b/tests/v2/responses/compress.json @@ -0,0 +1,4 @@ +{ + "status_code": 204, + "response": null +} diff --git a/tests/v2/responses/delete.json b/tests/v2/responses/delete.json new file mode 100644 index 0000000..4513b62 --- /dev/null +++ b/tests/v2/responses/delete.json @@ -0,0 +1,4 @@ +{ + "status_code": 204, + "response": null +} diff --git a/tests/v2/responses/extract.json b/tests/v2/responses/extract.json new file mode 100644 index 0000000..4513b62 --- /dev/null +++ b/tests/v2/responses/extract.json @@ -0,0 +1,4 @@ +{ + "status_code": 204, + "response": null +} diff --git a/tests/v2/responses/file.json b/tests/v2/responses/file.json new file mode 100644 index 0000000..9553cf3 --- /dev/null +++ b/tests/v2/responses/file.json @@ -0,0 +1,6 @@ +{ + "status_code": 200, + "response": { + "output": "ASCII text" + } +} diff --git a/tests/v2/responses/head.json b/tests/v2/responses/head.json new file mode 100644 index 0000000..11483a8 --- /dev/null +++ b/tests/v2/responses/head.json @@ -0,0 +1,11 @@ +{ + "status_code": 200, + "response": { + "output": { + "content": "1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n", + "contentType": "lines", + "startPosition": 0, + "endPosition": 10 + } + } +} diff --git a/tests/v2/responses/head_bytes.json b/tests/v2/responses/head_bytes.json new file mode 100644 index 0000000..9ea8f57 --- /dev/null +++ b/tests/v2/responses/head_bytes.json @@ -0,0 +1,11 @@ +{ + "status_code": 200, + "response": { + "output": { + "content": "1\n2\n3\n4", + "contentType": "bytes", + "startPosition": 0, + "endPosition": 7 + } + } +} diff --git a/tests/v2/responses/head_bytes_exclude_trailing.json b/tests/v2/responses/head_bytes_exclude_trailing.json new file mode 100644 index 0000000..e7ca91f --- /dev/null +++ b/tests/v2/responses/head_bytes_exclude_trailing.json @@ -0,0 +1,9 @@ +{ + "status_code": 200, + "response": { + "content": "1\n2\n3\n4\n5\n6\n7\n8\n9\n10", + "contentType": "bytes", + "startPosition": 0, + "endPosition": -7 + } +} diff --git a/tests/v2/responses/head_lines.json b/tests/v2/responses/head_lines.json new file mode 100644 index 0000000..394730c --- /dev/null +++ b/tests/v2/responses/head_lines.json @@ -0,0 +1,11 @@ +{ + "status_code": 200, + "response": { + "output": { + "content": "1\n2\n3\n", + "contentType": "lines", + "startPosition": 0, + "endPosition": 3 + } + } +} diff --git a/tests/v2/responses/head_lines_exclude_trailing.json b/tests/v2/responses/head_lines_exclude_trailing.json new file mode 100644 index 0000000..d660309 --- /dev/null +++ b/tests/v2/responses/head_lines_exclude_trailing.json @@ -0,0 +1,9 @@ +{ + "status_code": 200, + "response": { + "content": "1\n2\n3\n4\n5\n6\n7\n8\n9\n", + "contentType": "lines", + "startPosition": 0, + "endPosition": -3 + } +} diff --git a/tests/v2/responses/job_info.json b/tests/v2/responses/job_info.json new file mode 100644 index 0000000..482d2c6 --- /dev/null +++ b/tests/v2/responses/job_info.json @@ -0,0 +1,41 @@ +{ + "status_code": 200, + "response": { + "jobs": [ + { + "account": "test", + "allocationNodes": 1, + "cluster": "cluster", + "exitCode": { + "status": [ + "ERROR" + ], + "returnCode": 8 + }, + "group": "users", + "jobId": 1, + "name": "allocation", + "nodes": "localhost", + "partition": "part01", + "priority": 1, + "killRequestUser": "", + "state": { + "current": [ + "FAILED" + ], + "reason": "None" + }, + "time": { + "elapsed": 0, + "start": 1730968675, + "end": 1730968675, + "suspended": 0, + "limit": 7200, + "submission": 1730968674 + }, + "user": "test1", + "workingDirectory": "/home/test1" + } + ] + } +} diff --git a/tests/v2/responses/job_metadata.json b/tests/v2/responses/job_metadata.json new file mode 100644 index 0000000..2c8e588 --- /dev/null +++ b/tests/v2/responses/job_metadata.json @@ -0,0 +1,14 @@ +{ + "status_code": 200, + "response": { + "jobs": [ + { + "jobId": 26, + "script": "#!/bin/sh\npwd", + "standardInput": "/dev/null", + "standardOutput": "/home/test1/slurm-26.out", + "standardError": "/home/test1/slurm-26.out" + } + ] + } +} diff --git a/tests/v2/responses/job_submit.json b/tests/v2/responses/job_submit.json new file mode 100644 index 0000000..20f5c73 --- /dev/null +++ b/tests/v2/responses/job_submit.json @@ -0,0 +1,6 @@ +{ + "status_code": 201, + "response": { + "jobId": 27 + } +} diff --git a/tests/v2/responses/ls_home.json b/tests/v2/responses/ls_home.json new file mode 100644 index 0000000..b7dd5c7 --- /dev/null +++ b/tests/v2/responses/ls_home.json @@ -0,0 +1,17 @@ +{ + "status_code": 200, + "response": { + "output": [ + { + "name": "bin", + "type": "d", + "linkTarget": null, + "user": "test1", + "group": "users", + "permissions": "rwxr-xr-x", + "lastModified": "2022-03-15T11:33:15", + "size": "4096" + } + ] + } +} diff --git a/tests/v2/responses/ls_home_dereference.json b/tests/v2/responses/ls_home_dereference.json new file mode 100644 index 0000000..723cc72 --- /dev/null +++ b/tests/v2/responses/ls_home_dereference.json @@ -0,0 +1,27 @@ +{ + "status_code": 200, + "response": { + "output": [ + { + "name": "bin", + "type": "d", + "linkTarget": null, + "user": "test1", + "group": "users", + "permissions": "rwxr-xr-x", + "lastModified": "2024-10-23T09:06:00", + "size": "4096" + }, + { + "name": "link_to_file", + "type": "-", + "linkTarget": null, + "user": "root", + "group": "root", + "permissions": "rw-r--r--", + "lastModified": "2024-10-23T09:06:00", + "size": "0" + } + ] + } +} diff --git a/tests/v2/responses/ls_home_hidden.json b/tests/v2/responses/ls_home_hidden.json new file mode 100644 index 0000000..f8931e4 --- /dev/null +++ b/tests/v2/responses/ls_home_hidden.json @@ -0,0 +1,27 @@ +{ + "status_code": 200, + "response": { + "output": [ + { + "name": "bin", + "type": "d", + "linkTarget": null, + "user": "test1", + "group": "users", + "permissions": "rwxr-xr-x", + "lastModified": "2022-03-15T11:33:15", + "size": "4096" + }, + { + "name": ".bashrc", + "type": "-", + "linkTarget": null, + "user": "test1", + "group": "users", + "permissions": "rw-r--r--", + "lastModified": "2022-05-07T15:11:50", + "size": "1177" + } + ] + } +} diff --git a/tests/v2/responses/ls_home_recursive.json b/tests/v2/responses/ls_home_recursive.json new file mode 100644 index 0000000..ab97f32 --- /dev/null +++ b/tests/v2/responses/ls_home_recursive.json @@ -0,0 +1,27 @@ +{ + "status_code": 200, + "response": { + "output": [ + { + "name": "/home/test1/bin", + "type": "d", + "linkTarget": null, + "user": "test1", + "group": "users", + "permissions": "rwxr-xr-x", + "lastModified": "2024-10-23T09:06:00", + "size": "4096" + }, + { + "name": "/home/test1/bin/file", + "type": "-", + "linkTarget": null, + "user": "root", + "group": "root", + "permissions": "rw-r--r--", + "lastModified": "2024-10-23T09:06:00", + "size": "0" + } + ] + } +} diff --git a/tests/v2/responses/ls_home_uid.json b/tests/v2/responses/ls_home_uid.json new file mode 100644 index 0000000..9dcdcf8 --- /dev/null +++ b/tests/v2/responses/ls_home_uid.json @@ -0,0 +1,27 @@ +{ + "status_code": 200, + "response": { + "output": [ + { + "name": "bin", + "type": "d", + "linkTarget": null, + "user": "1000", + "group": "100", + "permissions": "rwxr-xr-x", + "lastModified": "2024-10-23T09:06:00", + "size": "4096" + }, + { + "name": "link_to_file", + "type": "l", + "linkTarget": "bin/file", + "user": "0", + "group": "0", + "permissions": "rwxrwxrwx", + "lastModified": "2024-10-23T09:10:04", + "size": "8" + } + ] + } +} diff --git a/tests/v2/responses/ls_invalid_path.json b/tests/v2/responses/ls_invalid_path.json new file mode 100644 index 0000000..1f5f343 --- /dev/null +++ b/tests/v2/responses/ls_invalid_path.json @@ -0,0 +1,10 @@ +{ + "status_code": 404, + "response": { + "errorType": "error", + "message": "ls: cannot access '/home/test23': No such file or directory", + "data": null, + "user": "test1", + "authHeader": "Bearer token" + } +} diff --git a/tests/v2/responses/mkdir.json b/tests/v2/responses/mkdir.json new file mode 100644 index 0000000..e75aa75 --- /dev/null +++ b/tests/v2/responses/mkdir.json @@ -0,0 +1,15 @@ +{ + "status_code": 201, + "response": { + "output": { + "name": "/home/fireuser/new_dir", + "type": "d", + "linkTarget": "None", + "user": "fireuser", + "group": "users", + "permissions": "rwxr-xr-x", + "lastModified": "2024-12-19T10:29:59", + "size": "4096" + } + } +} diff --git a/tests/v2/responses/nodes.json b/tests/v2/responses/nodes.json new file mode 100644 index 0000000..271f018 --- /dev/null +++ b/tests/v2/responses/nodes.json @@ -0,0 +1,34 @@ +{ + "status_code": 200, + "response": { + "nodes": [ + { + "sockets": 2, + "cores": 1, + "threads": 1, + "cpus": 2, + "cpuLoad": 229.0, + "freeMemory": null, + "features": [ + "f7t" + ], + "name": "localhost", + "address": "localhost", + "hostname": "localhost", + "state": [ + "IDLE" + ], + "partitions": [ + "part01", + "part02", + "xfer" + ], + "weight": 1, + "slurmdVersion": null, + "allocMemory": 0, + "allocCpus": 0, + "idleCpus": null + } + ] + } +} diff --git a/tests/v2/responses/partitions.json b/tests/v2/responses/partitions.json new file mode 100644 index 0000000..3a43bb1 --- /dev/null +++ b/tests/v2/responses/partitions.json @@ -0,0 +1,31 @@ +{ + "status_code": 200, + "response": { + "partitions": [ + { + "partitionName": "part01", + "cpus": 2, + "totalNodes": 1, + "state": [ + "UP" + ] + }, + { + "partitionName": "part02", + "cpus": 2, + "totalNodes": 1, + "state": [ + "UP" + ] + }, + { + "partitionName": "xfer", + "cpus": 2, + "totalNodes": 1, + "state": [ + "UP" + ] + } + ] + } +} diff --git a/tests/v2/responses/reservations.json b/tests/v2/responses/reservations.json new file mode 100644 index 0000000..81bf192 --- /dev/null +++ b/tests/v2/responses/reservations.json @@ -0,0 +1,14 @@ +{ + "status_code": 200, + "response": { + "reservations": [ + { + "reservationName": "root_1", + "nodes": "localhost", + "endTime": 1729173600, + "startTime": 1729159200, + "features": "" + } + ] + } +} diff --git a/tests/v2/responses/stat.json b/tests/v2/responses/stat.json new file mode 100644 index 0000000..3c0fa65 --- /dev/null +++ b/tests/v2/responses/stat.json @@ -0,0 +1,17 @@ +{ + "status_code": 200, + "response": { + "output": { + "mode": 33188, + "ino": 2299756, + "dev": 76, + "nlink": 1, + "uid": 0, + "gid": 0, + "size": 27, + "atime": 1729771818, + "ctime": 1729676630, + "mtime": 1729676630 + } + } +} diff --git a/tests/v2/responses/stat_dereference.json b/tests/v2/responses/stat_dereference.json new file mode 100644 index 0000000..ee88e71 --- /dev/null +++ b/tests/v2/responses/stat_dereference.json @@ -0,0 +1,17 @@ +{ + "status_code": 200, + "response": { + "output": { + "mode": 16877, + "ino": 2297286, + "dev": 76, + "nlink": 1, + "uid": 1000, + "gid": 100, + "size": 4096, + "atime": 1729674368, + "ctime": 1729674360, + "mtime": 1729674360 + } + } +} diff --git a/tests/v2/responses/symlink.json b/tests/v2/responses/symlink.json new file mode 100644 index 0000000..ab9f051 --- /dev/null +++ b/tests/v2/responses/symlink.json @@ -0,0 +1,15 @@ +{ + "status_code": 201, + "response": { + "output": { + "name": "/home/fireuser/upload_folder/dom-60s.sh", + "type": "-", + "linkTarget": null, + "user": "fireuser", + "group": "users", + "permissions": "rw-r--r--", + "lastModified": "2024-12-19T11:00:44", + "size": "0" + } + } +} diff --git a/tests/v2/responses/systems.json b/tests/v2/responses/systems.json new file mode 100644 index 0000000..558003e --- /dev/null +++ b/tests/v2/responses/systems.json @@ -0,0 +1,115 @@ +{ + "status_code": 200, + "response": { + "systems": [ + { + "name": "cluster-api", + "host": "192.168.240.2", + "sshPort": 22, + "sshCertEmbeddedCmd": true, + "scheduler": { + "type": "slurm", + "version": "24.05.0", + "apiUrl": "http://192.168.240.2:6820", + "apiVersion": "0.0.40" + }, + "health": { + "lastChecked": "2024-10-04T14:39:29.092143Z", + "latency": 0.006885528564453125, + "healthy": true, + "message": null, + "nodes": { + "available": 1, + "total": 1 + } + }, + "probing": { + "interval": 60, + "timeout": 10, + "healthyLatency": 1.5, + "healthyLoad": 0.8, + "startupGracePeriod": 300 + }, + "fileSystems": [ + { + "path": "/home", + "dataType": "users", + "defaultWorkDir": true + }, + { + "path": "/store", + "dataType": "store", + "defaultWorkDir": false + }, + { + "path": "/archive", + "dataType": "archive", + "defaultWorkDir": false + } + ], + "datatransferJobsDirectives": [ + "#SBATCH --constraint=mc", + "#SBATCH --nodes=1", + "#SBATCH --time=0-00:15:00" + ], + "timeouts": { + "sshConnection": 5, + "sshLogin": 5, + "sshCommandExecution": 5 + } + }, + { + "name": "cluster-ssh", + "host": "192.168.240.2", + "sshPort": 22, + "sshCertEmbeddedCmd": true, + "scheduler": { + "type": "slurm", + "version": "24.05.0", + "apiUrl": null, + "apiVersion": null + }, + "health": { + "lastChecked": "2024-10-04T14:39:29.696364Z", + "latency": 0.6117508411407471, + "healthy": true, + "message": null, + "nodes": { + "available": 1, + "total": 1 + } + }, + "probing": { + "interval": 60, + "timeout": 10, + "healthyLatency": 1.5, + "healthyLoad": 0.8, + "startupGracePeriod": 300 + }, + "fileSystems": [ + { + "path": "/home", + "dataType": "users", + "defaultWorkDir": true + }, + { + "path": "/store", + "dataType": "store", + "defaultWorkDir": false + }, + { + "path": "/scratch", + "dataType": "scratch", + "defaultWorkDir": false + } + ], + "datatransferJobsDirectives": [], + "timeouts": { + "sshConnection": 5, + "sshLogin": 5, + "sshCommandExecution": 5 + } + } + ] + } +} diff --git a/tests/v2/responses/tail.json b/tests/v2/responses/tail.json new file mode 100644 index 0000000..ada5321 --- /dev/null +++ b/tests/v2/responses/tail.json @@ -0,0 +1,11 @@ +{ + "status_code": 200, + "response": { + "output": { + "content": "3\n4\n5\n6\n7\n8\n9\n10\n11\n12\n", + "contentType": "lines", + "startPosition": 10, + "endPosition": -1 + } + } +} diff --git a/tests/v2/responses/tail_bytes.json b/tests/v2/responses/tail_bytes.json new file mode 100644 index 0000000..d8e5732 --- /dev/null +++ b/tests/v2/responses/tail_bytes.json @@ -0,0 +1,11 @@ +{ + "status_code": 200, + "response": { + "output": { + "content": "\n11\n12\n", + "contentType": "bytes", + "startPosition": -7, + "endPosition": -1 + } + } +} diff --git a/tests/v2/responses/tail_bytes_exclude_beginning.json b/tests/v2/responses/tail_bytes_exclude_beginning.json new file mode 100644 index 0000000..2272b47 --- /dev/null +++ b/tests/v2/responses/tail_bytes_exclude_beginning.json @@ -0,0 +1,11 @@ +{ + "status_code": 200, + "response": { + "output": { + "content": "4\n5\n6\n7\n8\n9\n10\n11\n12\n", + "contentType": "bytes", + "startPosition": 7, + "endPosition": -1 + } + } +} diff --git a/tests/v2/responses/tail_lines.json b/tests/v2/responses/tail_lines.json new file mode 100644 index 0000000..078cad5 --- /dev/null +++ b/tests/v2/responses/tail_lines.json @@ -0,0 +1,11 @@ +{ + "status_code": 200, + "response": { + "output": { + "content": "10\n11\n12\n", + "contentType": "lines", + "startPosition": -3, + "endPosition": -1 + } + } +} diff --git a/tests/v2/responses/tail_lines_exclude_beginning.json b/tests/v2/responses/tail_lines_exclude_beginning.json new file mode 100644 index 0000000..1be5583 --- /dev/null +++ b/tests/v2/responses/tail_lines_exclude_beginning.json @@ -0,0 +1,11 @@ +{ + "status_code": 200, + "response": { + "output": { + "content": "3\n4\n5\n6\n7\n8\n9\n10\n11\n12\n", + "contentType": "lines", + "startPosition": 3, + "endPosition": -1 + } + } +} diff --git a/tests/v2/responses/userinfo.json b/tests/v2/responses/userinfo.json new file mode 100644 index 0000000..e0656a3 --- /dev/null +++ b/tests/v2/responses/userinfo.json @@ -0,0 +1,19 @@ +{ + "status_code": 200, + "response": { + "user": { + "id": "1000", + "name": "fireuser" + }, + "group": { + "id": "100", + "name": "users" + }, + "groups": [ + { + "id": "100", + "name": "users" + } + ] + } +} diff --git a/tests/v2/responses/view.json b/tests/v2/responses/view.json new file mode 100644 index 0000000..c3a356f --- /dev/null +++ b/tests/v2/responses/view.json @@ -0,0 +1,8 @@ +{ + "status_code": 200, + "response": { + "output": { + "output": "1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n11\n12\n" + } + } +} diff --git a/tests/v2/test_status_v2_async.py b/tests/v2/test_status_v2_async.py new file mode 100644 index 0000000..d3c1951 --- /dev/null +++ b/tests/v2/test_status_v2_async.py @@ -0,0 +1,101 @@ +import json +import pytest +import re + +from context_v2 import AsyncFirecrest +from werkzeug.wrappers import Response +from werkzeug.wrappers import Request + + +def read_json_file(filename): + with open(filename) as fp: + data = json.load(fp) + + return data + + +@pytest.fixture +def valid_client(fc_server): + class ValidAuthorization: + def get_access_token(self): + return "VALID_TOKEN" + + return AsyncFirecrest( + firecrest_url=fc_server.url_for("/"), + authorization=ValidAuthorization() + ) + + +@pytest.fixture +def invalid_client(fc_server): + class InvalidAuthorization: + def get_access_token(self): + return "INVALID_TOKEN" + + return AsyncFirecrest( + firecrest_url=fc_server.url_for("/"), + authorization=InvalidAuthorization() + ) + + +@pytest.fixture +def fc_server(httpserver): + httpserver.expect_request( + re.compile("/status/.*"), method="GET" + ).respond_with_handler(status_handler) + + return httpserver + + +def status_handler(request: Request): + if request.headers["Authorization"] != "Bearer VALID_TOKEN": + return Response( + json.dumps({"message": "Bad token; invalid JSON"}), + status=401, + content_type="application/json", + ) + + endpoint = request.url.split("/")[-1] + data = read_json_file(f"v2/responses/{endpoint}.json") + + ret = data["response"] + ret_status = data["status_code"] + + return Response(json.dumps(ret), + status=ret_status, + content_type="application/json") + + +@pytest.mark.asyncio +async def test_systems(valid_client): + data = read_json_file("v2/responses/systems.json") + resp = await valid_client.systems() + assert resp == data["response"]["systems"] + + +@pytest.mark.asyncio +async def test_partitions(valid_client): + data = read_json_file("v2/responses/partitions.json") + resp = await valid_client.partitions("cluster") + assert resp == data["response"]["partitions"] + + +@pytest.mark.asyncio +async def test_nodes(valid_client): + data = read_json_file("v2/responses/nodes.json") + resp = await valid_client.nodes("cluster") + assert resp == data["response"]["nodes"] + + +@pytest.mark.asyncio +async def test_reservations(valid_client): + data = read_json_file("v2/responses/reservations.json") + resp = await valid_client.reservations("cluster") + assert resp == data["response"]["reservations"] + + +@pytest.mark.asyncio +async def test_userinfo(valid_client): + data = read_json_file("v2/responses/userinfo.json") + resp = await valid_client.userinfo("cluster") + assert resp == data["response"] diff --git a/tests/v2/test_status_v2_sync.py b/tests/v2/test_status_v2_sync.py new file mode 100644 index 0000000..4936daf --- /dev/null +++ b/tests/v2/test_status_v2_sync.py @@ -0,0 +1,96 @@ +import json +import pytest +import re + +from context_v2 import Firecrest +from werkzeug.wrappers import Response +from werkzeug.wrappers import Request + + +def read_json_file(filename): + with open(filename) as fp: + data = json.load(fp) + + return data + + +@pytest.fixture +def valid_client(fc_server): + class ValidAuthorization: + def get_access_token(self): + return "VALID_TOKEN" + + return Firecrest( + firecrest_url=fc_server.url_for("/"), + authorization=ValidAuthorization() + ) + + +@pytest.fixture +def invalid_client(fc_server): + class InvalidAuthorization: + def get_access_token(self): + return "INVALID_TOKEN" + + return Firecrest( + firecrest_url=fc_server.url_for("/"), + authorization=InvalidAuthorization() + ) + + +@pytest.fixture +def fc_server(httpserver): + httpserver.expect_request( + re.compile("/status/.*"), method="GET" + ).respond_with_handler(status_handler) + + return httpserver + + +def status_handler(request: Request): + if request.headers["Authorization"] != "Bearer VALID_TOKEN": + return Response( + json.dumps({"message": "Bad token; invalid JSON"}), + status=401, + content_type="application/json", + ) + + endpoint = request.url.split("/")[-1] + data = read_json_file(f"v2/responses/{endpoint}.json") + + ret = data["response"] + ret_status = data["status_code"] + + return Response(json.dumps(ret), + status=ret_status, + content_type="application/json") + + +def test_systems(valid_client): + data = read_json_file("v2/responses/systems.json") + resp = valid_client.systems() + assert resp == data["response"]["systems"] + + +def test_partitions(valid_client): + data = read_json_file("v2/responses/partitions.json") + resp = valid_client.partitions("cluster") + assert resp == data["response"]["partitions"] + + +def test_nodes(valid_client): + data = read_json_file("v2/responses/nodes.json") + resp = valid_client.nodes("cluster") + assert resp == data["response"]["nodes"] + + +def test_reservations(valid_client): + data = read_json_file("v2/responses/reservations.json") + resp = valid_client.reservations("cluster") + assert resp == data["response"]["reservations"] + + +def test_userinfo(valid_client): + data = read_json_file("v2/responses/userinfo.json") + resp = valid_client.userinfo("cluster") + assert resp == data["response"] diff --git a/utils/run_unasync.py b/utils/run_unasync.py new file mode 100644 index 0000000..9a87911 --- /dev/null +++ b/utils/run_unasync.py @@ -0,0 +1,20 @@ +import unasync + + +unasync.unasync_files( + ['firecrest/v2/_async/Client.py'], + rules=[ + unasync.Rule( + fromdir="firecrest/v2/_async/", + todir="firecrest/v2/_sync/", + additional_replacements={ + "AsyncFirecrest": "Firecrest", + "AsyncClient": "Client", + "aclose": "close", + # "asyncio.sleep": "time.sleep", + # multi token replacement doesn't work, it happens manually + # TODO find a way to replace this automatically + } + ), + ] +)