diff --git a/dockerfiles/plantit/requirements.txt b/dockerfiles/plantit/requirements.txt index 61a7e0c2..f484f020 100644 --- a/dockerfiles/plantit/requirements.txt +++ b/dockerfiles/plantit/requirements.txt @@ -46,4 +46,5 @@ boto3==1.20.50 flower==1.0.0 scipy==1.8.0 pandas==1.4.0 -statsmodels==0.13.1 \ No newline at end of file +statsmodels==0.13.1 +pycyapi \ No newline at end of file diff --git a/plantit/plantit/terrain.py b/plantit/plantit/terrain.py index f6a2e570..849fc78c 100644 --- a/plantit/plantit/terrain.py +++ b/plantit/plantit/terrain.py @@ -20,36 +20,6 @@ logger = logging.getLogger(__name__) -class Unauthorized(Exception): - pass - - -class BadRequest(Exception): - pass - - -class BadResponse(Exception): - pass - - -@retry( - reraise=True, - wait=wait_exponential(multiplier=1, min=4, max=10), - stop=stop_after_attempt(3), - retry=(retry_if_exception_type(ConnectionError) | retry_if_exception_type( - RequestException) | retry_if_exception_type(ReadTimeout) | retry_if_exception_type( - Timeout))) -def get_profile(username: str, access_token: str) -> dict: - logger.debug(f"Getting CyVerse profile for user {username}") - response = requests.get( - f"https://de.cyverse.org/terrain/secured/user-info?username={username}", - headers={'Authorization': f"Bearer {access_token}"}) - response.raise_for_status() - content = response.json() - if username in content: return content[username] - else: return None - - @retry( reraise=True, wait=wait_exponential(multiplier=1, min=4, max=10), @@ -77,359 +47,3 @@ def refresh_tokens(username: str, refresh_token: str) -> (str, str): raise BadRequest(f"Missing params on token response, expected 'access_token' and 'refresh_token' but got:\n{pprint.pprint(content)}") return content['access_token'], content['refresh_token'] - - -@retry( - reraise=True, - wait=wait_exponential(multiplier=1, min=4, max=10), - stop=stop_after_attempt(3), - retry=(retry_if_exception_type(ConnectionError) | retry_if_exception_type( - RequestException) | retry_if_exception_type(ReadTimeout) | retry_if_exception_type( - Timeout))) -def list_dir(path: str, token: str) -> List[str]: - logger.debug(f"Listing data store directory {path}") - with requests.get( - f"https://de.cyverse.org/terrain/secured/filesystem/paged-directory?limit=1000&path={path}", - headers={'Authorization': f"Bearer {token}"}) as response: - if response.status_code == 500 and response.json()['error_code'] == 'ERR_DOES_NOT_EXIST': - raise ValueError(f"Path {path} does not exist") - - response.raise_for_status() - content = response.json() - files = content['files'] - return [file['path'] for file in files] - - -@retry( - reraise=True, - wait=wait_exponential(multiplier=1, min=4, max=10), - stop=stop_after_attempt(3), - retry=(retry_if_exception_type(ConnectionError) | retry_if_exception_type( - RequestException) | retry_if_exception_type(ReadTimeout) | retry_if_exception_type( - Timeout))) -async def get_dirs_async(paths: List[str], token: str, timeout: int = 15): - logger.debug(f"Listing data store directories: {', '.join(paths)}") - urls = [f"https://de.cyverse.org/terrain/secured/filesystem/paged-directory?limit=1000&path={path}" for path in paths] - headers = { - "Authorization": f"Bearer {token}", - } - async with httpx.AsyncClient(headers=headers, timeout=timeout) as client: - tasks = [client.get(url).json() for url in urls] - results = await asyncio.gather(*tasks) - return results - - -@retry( - reraise=True, - wait=wait_exponential(multiplier=1, min=4, max=10), - stop=stop_after_attempt(3), - retry=(retry_if_exception_type(ConnectionError) | retry_if_exception_type( - RequestException) | retry_if_exception_type(ReadTimeout) | retry_if_exception_type( - Timeout))) -async def create_dir_async(path: str, token: str, timeout: int = 15): - logger.debug(f"Creating data store directory {path}") - headers = { - "Authorization": f"Bearer {token}", - } - async with httpx.AsyncClient(headers=headers, timeout=timeout) as client: - response = await client.post("https://de.cyverse.org/terrain/secured/filesystem/directory/create", data=json.dumps({'path': path})) - response.raise_for_status() - - -@retry( - reraise=True, - wait=wait_exponential(multiplier=1, min=4, max=10), - stop=stop_after_attempt(3), - retry=(retry_if_exception_type(ConnectionError) | retry_if_exception_type( - RequestException) | retry_if_exception_type(ReadTimeout) | retry_if_exception_type( - Timeout))) -def share_dir(data: dict, token: str, timeout: int = 15): - logger.debug(f"Sharing data store path(s): {json.dumps(data)}") - headers = { - "Authorization": f"Bearer {token}", - "Content-Type": "application/json;charset=utf-8" - } - with requests.post("https://de.cyverse.org/terrain/secured/share", data=json.dumps(data), headers=headers, timeout=timeout) as response: - response.raise_for_status() - - -@retry( - reraise=True, - wait=wait_exponential(multiplier=1, min=4, max=10), - stop=stop_after_attempt(3), - retry=(retry_if_exception_type(ConnectionError) | retry_if_exception_type( - RequestException) | retry_if_exception_type(ReadTimeout) | retry_if_exception_type( - Timeout))) -async def share_dir_async(data: dict, token: str, timeout: int = 15): - logger.debug(f"Sharing data store path(s): {json.dumps(data)}") - headers = { - "Authorization": f"Bearer {token}", - "Content-Type": "application/json;charset=utf-8" - } - async with httpx.AsyncClient(headers=headers, timeout=timeout) as client: - response = await client.post("https://de.cyverse.org/terrain/secured/share", data=json.dumps(data)) - response.raise_for_status() - - -@retry( - reraise=True, - wait=wait_exponential(multiplier=1, min=4, max=10), - stop=stop_after_attempt(3), - retry=(retry_if_exception_type(ConnectionError) | retry_if_exception_type( - RequestException) | retry_if_exception_type(ReadTimeout) | retry_if_exception_type( - Timeout))) -def unshare_dir(data: dict, token: str, timeout: int = 15): - logger.debug(f"Unsharing data store path(s): {json.dumps(data)}") - headers = { - "Authorization": f"Bearer {token}", - "Content-Type": 'application/json;charset=utf-8' - } - with requests.post("https://de.cyverse.org/terrain/secured/unshare", data=json.dumps(data), headers=headers, timeout=timeout) as response: - response.raise_for_status() - - -@retry( - reraise=True, - wait=wait_exponential(multiplier=1, min=4, max=10), - stop=stop_after_attempt(3), - retry=(retry_if_exception_type(ConnectionError) | retry_if_exception_type( - RequestException) | retry_if_exception_type(ReadTimeout) | retry_if_exception_type( - Timeout))) -async def unshare_dir_async(data: dict, token: str, timeout: int = 15): - logger.debug(f"Unsharing data store path(s): {json.dumps(data)}") - headers = { - "Authorization": f"Bearer {token}", - "Content-Type": 'application/json;charset=utf-8' - } - async with httpx.AsyncClient(headers=headers, timeout=timeout) as client: - response = await client.post("https://de.cyverse.org/terrain/secured/unshare", data=json.dumps(data)) - response.raise_for_status() - - -@retry( - reraise=True, - wait=wait_exponential(multiplier=1, min=4, max=10), - stop=stop_after_attempt(3), - retry=(retry_if_exception_type(ConnectionError) | retry_if_exception_type( - RequestException) | retry_if_exception_type(ReadTimeout) | retry_if_exception_type( - Timeout))) -def get_file(path: str, token: str) -> dict: - logger.debug(f"Getting data store file {path}") - with requests.post( - "https://de.cyverse.org/terrain/secured/filesystem/stat", - data=json.dumps({'paths': [path]}), - headers={'Authorization': f"Bearer {token}", "Content-Type": 'application/json;charset=utf-8'}) as response: - if response.status_code == 500 and response.json()['error_code'] == 'ERR_DOES_NOT_EXIST': - raise ValueError(f"Path {path} does not exist") - elif response.status_code == 400: - pprint.pprint(response.json()) - - response.raise_for_status() - content = response.json() - return content['paths'][path] - - -@retry( - reraise=True, - wait=wait_exponential(multiplier=1, min=4, max=10), - stop=stop_after_attempt(3), - retry=(retry_if_exception_type(ConnectionError) | retry_if_exception_type( - RequestException) | retry_if_exception_type(ReadTimeout) | retry_if_exception_type( - Timeout))) -def path_exists(path, token) -> bool: - """ - Checks whether a collection (directory) or object (file) exists at the given path. - - Args: - path: The path - token: The authentication token - - Returns: True if the path exists, otherwise False - """ - - logger.debug(f"Checking if data store path exists: {path}") - - data = {'paths': [path]} - headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json;charset=utf-8"} - response = requests.post("https://de.cyverse.org/terrain/secured/filesystem/exists", data=json.dumps(data), headers=headers) - - # before invoking `raise_for_status` and bubbling an exception up, - # try to decode the response and check the reason for failure - if response.status_code != 200: - try: - content = response.json() - logger.warning(f"Bad response when checking if path '{path}' exists: {content}") - finally: pass - - response.raise_for_status() - content = response.json() - if 'paths' not in content: raise ValueError(f"No paths on response: {content}") - if path not in content['paths'].keys(): return False - return content['paths'][path] - - -@retry( - reraise=True, - wait=wait_exponential(multiplier=1, min=4, max=10), - stop=stop_after_attempt(3), - retry=(retry_if_exception_type(ConnectionError) | retry_if_exception_type( - RequestException) | retry_if_exception_type(ReadTimeout) | retry_if_exception_type( - Timeout))) -def path_exists_and_type(path, token) -> Tuple[bool, str]: - """ - Checks whether a collection (directory) or object (file) exists at the given path, and returns its type. - - Args: - path: The path - token: The authentication token - - Returns: (True, type of object) if the path exists, otherwise (False, None) - """ - - logger.debug(f"Checking if data store path exists (and type): {path}") - - data = {'paths': [path]} - headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} - response = requests.post("https://de.cyverse.org/terrain/secured/filesystem/stat", data=json.dumps(data), headers=headers) - - # before invoking `raise_for_status` and bubbling an exception up, - # try to decode the response and check the reason for failure - if response.status_code == 400: - return False, None - elif response.status_code != 200: - try: - content = response.json() - logger.warning(f"Bad response when checking if path '{path}' exists: {content}") - finally: - pass - - response.raise_for_status() - content = response.json() - if 'paths' not in content: raise ValueError(f"No paths on response: {content}") - if path not in content['paths'].keys(): return False, None - return True, content['paths'][path]['type'] - - # response = requests.get(f"https://de.cyverse.org/terrain/secured/filesystem/paged-directory?limit=1000&path={path}", - # headers={"Authorization": f"Bearer {token}"}) - # content = response.json() - # input_type = 'directory' - # if response.status_code != 200: - - # # the path wasn't found - # if 'error_code' in content and content['error_code'] == 'ERR_DOES_NOT_EXIST': - # print(f"Path does not exist: {path}") - # return False, None - - # if 'error_code' in content - - # if 'error_code' not in content or ('error_code' in content and content['error_code'] == 'ERR_DOES_NOT_EXIST'): - # # split the path into name and full path of parent directory - # path_split = path.rpartition('/') - # parent = path_split[0] - # name = path_split[2] - - # # send the request - # up_response = requests.get(f"https://de.cyverse.org/terrain/secured/filesystem/paged-directory?limit=1000&path={parent}", - # headers={"Authorization": f"Bearer {token}"}) - - # # there are a few reasons we might have a bad response, handle them separately - # if up_response.status_code != 200: - # # catch 401s, they likely mean the terrain token used to invoke this method is expired or invalid - # if up_response.status_code == 401: - # raise ValueError(f"Not authorized for Terrain! (likely a bad token)") - - # # try to read the response, but it might not exist - # try: - # up_content = up_response.json() - # if 'error_code' not in up_content: - # print(f"Error response: {up_content}") - # return False, None - # else: - # print(f"Error: {up_content['error_code']}") - # return False, None - # except: - # print(f"Bad response from Terrain (status {response.status_code})") - # return False, None - - # # likewise there are a few different cases for a successful response - # else: - # up_content = response.json() - - # # parent directory (collection) wasn't found - # if 'files' not in up_content: - # print(f"Directory '{parent}' does not exist") - # return False, None - - # # TODO: test this endpoint with various paths to figure out if we need the following... - - # # multiple matches were found (how? is this even a possible response? make sure) - # elif len(up_content['files']) != 1: - # print(f"Multiple files found in directory '{parent}' matching name '{name}'") - # return False, None - - # # we found a match, but it has a different name (why would this happen?) - # elif up_content['files'][0]['label'] != name: - # print(f"File '{name}' does not exist in directory '{parent}'") - # return False, None - - # # we found a good match - # else: - # input_type = 'file' - # else: - # return False, None - # return True, input_type - - -@retry( - reraise=True, - wait=wait_exponential(multiplier=1, min=4, max=10), - stop=stop_after_attempt(3), - retry=(retry_if_exception_type(ConnectionError) | retry_if_exception_type( - RequestException) | retry_if_exception_type(ReadTimeout) | retry_if_exception_type( - Timeout))) -def push_file(from_path: str, to_prefix: str, token: str): - logger.debug(f"Uploading {from_path} to data store path {to_prefix}") - with open(from_path, 'rb') as file: - with requests.post(f"https://de.cyverse.org/terrain/secured/fileio/upload?dest={to_prefix}", - headers={'Authorization': f"Bearer {token}"}, - files={'file': (basename(from_path), file, 'application/octet-stream')}) as response: - if response.status_code == 500 and response.json()['error_code'] == 'ERR_EXISTS': - logger.warning(f"File '{join(to_prefix, basename(file.name))}' already exists, skipping upload") - else: - response.raise_for_status() - - -def push_dir(from_path: str, - to_prefix: str, - include_patterns: List[str] = None, - include_names: List[str] = None, - exclude_patterns: List[str] = None, - exclude_names: List[str] = None): - """ - Pushes the contents of the local directory (matching the given criteria, if provided) to the given data store collection. - Invokes `push_file()` internally, so no need for an outer retry policy. - - Args: - from_path: The local directory path - to_prefix: The data store collection path - include_patterns: Filename patterns to include - include_names: Filenames to include - exclude_patterns: Filename patterns to exclude - exclude_names: Filenames to exclude - """ - - # check path type - is_file = isfile(from_path) - is_dir = isdir(from_path) - - if not (is_dir or is_file): - raise FileNotFoundError(f"Local path '{from_path}' does not exist") - elif is_dir: - from_paths = list_local_files(from_path, include_patterns, include_names, exclude_patterns, exclude_names) - logger.debug(f"Uploading directory '{from_path}' with {len(from_paths)} file(s) to '{to_prefix}'") - with closing(Pool(processes=multiprocessing.cpu_count())) as pool: - pool.starmap(push_file, [(path, to_prefix) for path in [str(p) for p in from_paths]]) - elif is_file: - push_file(from_path, to_prefix) - else: - raise ValueError(f"Remote path '{to_prefix}' is a file; specify a directory path instead") diff --git a/plantit/plantit/tests/integration/test_terrain.py b/plantit/plantit/tests/integration/test_terrain.py index f94d6476..e69de29b 100644 --- a/plantit/plantit/tests/integration/test_terrain.py +++ b/plantit/plantit/tests/integration/test_terrain.py @@ -1,25 +0,0 @@ -from django.test import TestCase -from tenacity import RetryError -from requests import HTTPError - -from plantit.terrain import path_exists -from plantit.tokens import TerrainToken - - -class TerrainTest(TestCase): - def test_cyverse_path_exists_when_doesnt_exist_is_false(self): - exists = path_exists('/iplant/home/shared/iplantcollaborative/testing_tools/cowsay/cowsaid.txt', TerrainToken.get()) - self.assertFalse(exists) - - def test_cyverse_path_exists_when_is_a_file_is_true(self): - exists = path_exists('/iplant/home/shared/iplantcollaborative/testing_tools/cowsay/cowsay.txt', TerrainToken.get()) - self.assertTrue(exists) - - def test_cyverse_path_exists_when_is_a_directory_is_true(self): - exists = path_exists('/iplant/home/shared/iplantcollaborative/testing_tools/cowsay', TerrainToken.get()) - self.assertTrue(exists) - - def test_throws_error_when_terrain_token_is_invalid(self): - with self.assertRaises(HTTPError) as e: - path_exists('/iplant/home/shared/iplantcollaborative/testing_tools/cowsay', 'not a token') - self.assertTrue('401' in str(e))