From a434a36b1458a97dcb0773eaa12ad5926d41a7a0 Mon Sep 17 00:00:00 2001 From: Anthony Bartoletti Date: Wed, 18 Dec 2024 14:52:23 -0600 Subject: [PATCH 01/12] added ts_utc() function for convenient UTC timestamps --- zstash/utils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/zstash/utils.py b/zstash/utils.py index b74c1954..39915cfc 100644 --- a/zstash/utils.py +++ b/zstash/utils.py @@ -6,9 +6,13 @@ import subprocess from fnmatch import fnmatch from typing import Any, List, Tuple +from datetime import datetime, timezone + from .settings import TupleTarsRow, config, logger +def ts_utc(): + return datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S_%f") def filter_files(subset: str, files: List[str], include: bool) -> List[str]: From 768e3a9c9f96291bbe851bb5cadd43d29baafd0e Mon Sep 17 00:00:00 2001 From: Anthony Bartoletti Date: Wed, 18 Dec 2024 15:02:15 -0600 Subject: [PATCH 02/12] Added more log messaging --- zstash/hpss_utils.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/zstash/hpss_utils.py b/zstash/hpss_utils.py index 456e6a52..bde2e4ce 100644 --- a/zstash/hpss_utils.py +++ b/zstash/hpss_utils.py @@ -14,8 +14,9 @@ from .hpss import hpss_put from .settings import BLOCK_SIZE, TupleFilesRowNoId, TupleTarsRowNoId, config, logger -from .utils import create_tars_table, tars_table_exists +from .utils import create_tars_table, tars_table_exists, ts_utc +import subprocess # Minimum output file object class HashIO(object): @@ -63,6 +64,7 @@ def add_files( keep: bool, follow_symlinks: bool, skip_tars_md5: bool = False, + non_blocking: bool = False, ) -> List[str]: # Now, perform the actual archiving @@ -87,7 +89,7 @@ def add_files( tname = "{0:0{1}x}".format(itar, 6) # Create the tar file name by adding ".tar" tfname = "{}.tar".format(tname) - logger.info("Creating new tar archive {}".format(tfname)) + logger.info(f"{ts_utc()}: Creating new tar archive {tfname}") # Open that tar file in the cache do_hash: bool if not skip_tars_md5: @@ -136,12 +138,13 @@ def add_files( if i == nfiles - 1 or tarsize + next_file_size > maxsize: # Close current temporary file - logger.debug("Closing tar archive {}".format(tfname)) + logger.debug(f"{ts_utc()}: Closing tar archive {tfname}") tar.close() tarsize = tarFileObject.tell() tar_md5: Optional[str] = tarFileObject.md5() tarFileObject.close() + logger.info(f"{ts_utc()}: (add_files): Completed archive file {tfname}") if not skip_tars_md5: tar_tuple: TupleTarsRowNoId = (tfname, tarsize, tar_md5) logger.info("tar name={}, tar size={}, tar md5={}".format(*tar_tuple)) @@ -156,7 +159,15 @@ def add_files( hpss: str = config.hpss else: raise TypeError("Invalid config.hpss={}".format(config.hpss)) - hpss_put(hpss, os.path.join(cache, tfname), cache, keep) + + # NOTE: These lines could be added under an "if debug" condition + # logger.info(f"{ts_utc()}: CONTENTS of CACHE upon call to hpss_put:") + # process = subprocess.run(["ls", "-l", "zstash"], capture_output=True, text=True) + # print(process.stdout) + + logger.info(f"{ts_utc()}: DIVING: (add_files): Calling hpss_put to dispatch archive file {tfname}") + hpss_put(hpss, os.path.join(cache, tfname), cache, keep, non_blocking) + logger.info(f"{ts_utc()}: SURFACE (add_files): Called hpss_put to dispatch archive file {tfname}") # Update database with files that have been archived # Add a row to the "files" table, From 969a7c1ef477e6c6b6ff25967a80eafab5eaab37 Mon Sep 17 00:00:00 2001 From: Anthony Bartoletti Date: Wed, 18 Dec 2024 15:11:22 -0600 Subject: [PATCH 03/12] Passed non_blocking variable down call-chain, implemented cache-delete-file when blocking success, added more logging. --- zstash/hpss.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/zstash/hpss.py b/zstash/hpss.py index a055fe99..a800b21f 100644 --- a/zstash/hpss.py +++ b/zstash/hpss.py @@ -8,7 +8,7 @@ from .globus import globus_transfer from .settings import get_db_filename, logger -from .utils import run_command +from .utils import run_command, ts_utc def hpss_transfer( @@ -17,6 +17,7 @@ def hpss_transfer( transfer_type: str, cache: str, keep: bool = False, + non_blocking: bool = False, ): if hpss == "none": logger.info("{}: HPSS is unavailable".format(transfer_type)) @@ -86,8 +87,11 @@ def hpss_transfer( os.chdir(path) if scheme == "globus": + globus_status = "UNKNOWN" # Transfer file using the Globus Transfer Service - globus_transfer(endpoint, url_path, name, transfer_type) + logger.info(f"{ts_utc()}: DIVING: hpss calls globus_transfer(name={name})") + globus_status = globus_transfer(endpoint, url_path, name, transfer_type, non_blocking) + logger.info(f"{ts_utc()}: SURFACE hpss globus_transfer(name={name}) returns") else: # Transfer file using `hsi` command: str = 'hsi -q "cd {}; {} {}"'.format(hpss, transfer_command, name) @@ -99,16 +103,19 @@ def hpss_transfer( os.chdir(cwd) if transfer_type == "put": + if not keep and scheme == "globus" and globus_status == "SUCCEEDED" and not non_blocking: + # We should not keep the local file, so delete it now that it is on HPSS + os.remove(file_path) if not keep and scheme != "globus": # We should not keep the local file, so delete it now that it is on HPSS os.remove(file_path) -def hpss_put(hpss: str, file_path: str, cache: str, keep: bool = True): +def hpss_put(hpss: str, file_path: str, cache: str, keep: bool = True, non_blocking: bool = False): """ Put a file to the HPSS archive. """ - hpss_transfer(hpss, file_path, "put", cache, keep) + hpss_transfer(hpss, file_path, "put", cache, keep, non_blocking) def hpss_get(hpss: str, file_path: str, cache: str): From f48a372926a5455800bc1f81345f8df548f0e3ac Mon Sep 17 00:00:00 2001 From: Anthony Bartoletti Date: Wed, 18 Dec 2024 15:14:24 -0600 Subject: [PATCH 04/12] Passed non_blocking variable down call-chain, added more logging. --- zstash/create.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/zstash/create.py b/zstash/create.py index e8819278..613301a8 100644 --- a/zstash/create.py +++ b/zstash/create.py @@ -7,6 +7,7 @@ import sqlite3 import sys from typing import Any, List, Tuple +from datetime import datetime, timezone from six.moves.urllib.parse import urlparse @@ -19,9 +20,9 @@ get_files_to_archive, run_command, tars_table_exists, + ts_utc, ) - def create(): cache: str cache, args = setup_create() @@ -37,7 +38,7 @@ def create(): raise TypeError("Invalid config.hpss={}".format(config.hpss)) # Start doing actual work - logger.debug("Running zstash create") + logger.debug(f"{ts_utc()}: Running zstash create") logger.debug("Local path : {}".format(path)) logger.debug("HPSS path : {}".format(hpss)) logger.debug("Max size : {}".format(config.maxsize)) @@ -54,11 +55,13 @@ def create(): if hpss != "none": url = urlparse(hpss) if url.scheme == "globus": + # identify globus endpoints + logger.debug(f"{ts_utc()}:Calling globus_activate(hpss)") globus_activate(hpss) else: # config.hpss is not "none", so we need to # create target HPSS directory - logger.debug("Creating target HPSS directory") + logger.debug(f"{ts_utc()}:Creating target HPSS directory {hpss}") mkdir_command: str = "hsi -q mkdir -p {}".format(hpss) mkdir_error_str: str = "Could not create HPSS directory: {}".format(hpss) run_command(mkdir_command, mkdir_error_str) @@ -71,7 +74,7 @@ def create(): run_command(ls_command, ls_error_str) # Create cache directory - logger.debug("Creating local cache directory") + logger.debug(f"{ts_utc()}: Creating local cache directory") os.chdir(path) try: os.makedirs(cache) @@ -84,11 +87,14 @@ def create(): # TODO: Verify that cache is empty # Create and set up the database + logger.debug(f"{ts_utc()}: Calling create_database()") failures: List[str] = create_database(cache, args) # Transfer to HPSS. Always keep a local copy. + logger.debug(f"{ts_utc()}: calling hpss_put() for {get_db_filename(cache)}") hpss_put(hpss, get_db_filename(cache), cache, keep=True) + logger.debug(f"{ts_utc()}: calling globus_finalize()") globus_finalize(non_blocking=args.non_blocking) if len(failures) > 0: @@ -129,7 +135,7 @@ def setup_create() -> Tuple[str, argparse.Namespace]: optional.add_argument( "--maxsize", type=float, - help="maximum size of tar archives (in GB, default 256)", + help="maximum size of tar archives (in KB, default 256)", default=256, ) optional.add_argument( @@ -145,7 +151,7 @@ def setup_create() -> Tuple[str, argparse.Namespace]: optional.add_argument( "--non-blocking", action="store_true", - help="do not wait for each Globus transfer until it completes.", + help="do not wait for each Globus transfer to complete before creating additional archive files. This option will use more intermediate disk-space, but can increase throughput.", ) optional.add_argument( "-v", "--verbose", action="store_true", help="increase output verbosity" @@ -185,7 +191,7 @@ def setup_create() -> Tuple[str, argparse.Namespace]: def create_database(cache: str, args: argparse.Namespace) -> List[str]: # Create new database - logger.debug("Creating index database") + logger.debug(f"{ts_utc()}:Creating index database") if os.path.exists(get_db_filename(cache)): # Remove old database os.remove(get_db_filename(cache)) @@ -254,6 +260,7 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]: args.keep, args.follow_symlinks, skip_tars_md5=args.no_tars_md5, + non_blocking=args.non_blocking, ) except FileNotFoundError: raise Exception("Archive creation failed due to broken symlink.") @@ -268,6 +275,7 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]: args.keep, args.follow_symlinks, skip_tars_md5=args.no_tars_md5, + non_blocking=args.non_blocking, ) # Close database From 35b98d4506c805d6034f5e87911630e2c2140d62 Mon Sep 17 00:00:00 2001 From: Anthony Bartoletti Date: Wed, 18 Dec 2024 19:10:42 -0600 Subject: [PATCH 05/12] Passed non_blocking variable down call-chain, implemented loop for task_wait on blocking, force globus_transfer to return a sting status, added more logging. --- zstash/globus.py | 64 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 59 insertions(+), 5 deletions(-) diff --git a/zstash/globus.py b/zstash/globus.py index 964baf9b..3769fc7a 100644 --- a/zstash/globus.py +++ b/zstash/globus.py @@ -11,6 +11,7 @@ from globus_sdk import TransferAPIError, TransferClient, TransferData from globus_sdk.services.transfer.response.iterable import IterableTransferResponse from six.moves.urllib.parse import urlparse +from .utils import ts_utc from .settings import logger @@ -158,7 +159,7 @@ def file_exists(name: str) -> bool: def globus_transfer( - remote_ep: str, remote_path: str, name: str, transfer_type: str + remote_ep: str, remote_path: str, name: str, transfer_type: str, non_blocking: bool ): # noqa: C901 global transfer_client global local_endpoint @@ -167,6 +168,7 @@ def globus_transfer( global task_id global archive_directory_listing + logger.info(f"{ts_utc()}: Entered globus_transfer() for name = {name}") if not transfer_client: globus_activate("globus://" + remote_ep) if not transfer_client: @@ -216,21 +218,37 @@ def globus_transfer( try: if task_id: task = transfer_client.get_task(task_id) + # one of {ACTIVE, SUCCEEDED, FAILED, CANCELED, PENDING, INACTIVE} if task["status"] == "ACTIVE": - return + logger.info(f"{ts_utc()}: Globus task_id {task_id} Still Active") + return "ACTIVE" elif task["status"] == "SUCCEEDED": + logger.info(f"{ts_utc()}: Globus task_id {task_id} status = SUCCEEDED") src_ep = task["source_endpoint_id"] dst_ep = task["destination_endpoint_id"] label = task["label"] + ts = ts_utc() logger.info( - "Globus transfer {}, from {} to {}: {} succeeded".format( - task_id, src_ep, dst_ep, label + "{}:Globus transfer {}, from {} to {}: {} succeeded".format( + ts, task_id, src_ep, dst_ep, label ) ) else: - logger.error("Transfer FAILED") + logger.error(f"{ts_utc()}: Transfer FAILED (task_id = {task_id})") + + # DEBUG: review accumulated items in TransferData + logger.info(f"{ts_utc()}: TransferData: accumulated items:") + attribs = transfer_data.__dict__ + for item in attribs['data']['DATA']: + if item['DATA_TYPE'] == "transfer_item": + print(f" source item: {item['source_path']}") + + # SUBMIT new transfer here + logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}") task = submit_transfer_with_checks(transfer_data) task_id = task.get("task_id") + logger.info(f"{ts_utc()}: SURFACE Submit Transfer returned task_id = {task_id} for label {transfer_data['label']}") + transfer_data = None except TransferAPIError as e: if e.code == "NoCredException": @@ -246,9 +264,45 @@ def globus_transfer( logger.error("Exception: {}".format(e)) sys.exit(1) + # test for blocking + task_status = "UNKNOWN" + if not non_blocking: + logger.info(f"{ts_utc()}: BLOCKING START: invoking task_wait for task_id = {task_id}") + wait_timeout = 5 + max_retries = 3 + retry_count = 0 + while retry_count < max_retries: + try: + # Wait for the task to complete + transfer_client.task_wait(task_id, timeout=wait_timeout, polling_interval=1) + except GlobusHTTPError as e: + logger.error(f"Exception: {e}") + except Exception as e: + logger.error(f"Unexpected Exception: {e}") + else: + curr_task = transfer_client.get_task(task_id) + task_status = curr_task['status'] + if task_status == "SUCCEEDED": + break + finally: + retry_count += 1 + logger.info(f"{ts_utc()}: BLOCKING retry_count = {retry_count} of {max_retries} of timeout {wait_timeout} seconds") + + if retry_count == max_retries: + logger.info(f"{ts_utc()}: BLOCKING EXHAUSTED {max_retries} of timeout 5 seconds") + task_status = "EXHAUSTED_TIMEOUT_RETRIES" + + logger.info(f"{ts_utc()}: BLOCKING ENDS: task_id {task_id} returned from task_wait with status {task_status}") + else: + logger.info(f"{ts_utc()}: NO BLOCKING (task_wait) for task_id {task_id}") + + if transfer_type == "put": + return task_status + if transfer_type == "get" and task_id: globus_wait(task_id) + return task_status def globus_wait(task_id: str): global transfer_client From 0540e935c3612973bc5ac210d0491fa4776b90ef Mon Sep 17 00:00:00 2001 From: Anthony Bartoletti Date: Fri, 20 Dec 2024 13:00:02 -0600 Subject: [PATCH 06/12] Made globus_block_wait a function, clarified log messages, revert parameters --- zstash/create.py | 4 +-- zstash/globus.py | 80 ++++++++++++++++++++++++++++-------------------- zstash/hpss.py | 6 +++- 3 files changed, 54 insertions(+), 36 deletions(-) diff --git a/zstash/create.py b/zstash/create.py index 613301a8..42a5f5fd 100644 --- a/zstash/create.py +++ b/zstash/create.py @@ -61,7 +61,7 @@ def create(): else: # config.hpss is not "none", so we need to # create target HPSS directory - logger.debug(f"{ts_utc()}:Creating target HPSS directory {hpss}") + logger.debug(f"{ts_utc()}: Creating target HPSS directory {hpss}") mkdir_command: str = "hsi -q mkdir -p {}".format(hpss) mkdir_error_str: str = "Could not create HPSS directory: {}".format(hpss) run_command(mkdir_command, mkdir_error_str) @@ -135,7 +135,7 @@ def setup_create() -> Tuple[str, argparse.Namespace]: optional.add_argument( "--maxsize", type=float, - help="maximum size of tar archives (in KB, default 256)", + help="maximum size of tar archives (in GB, default 256)", default=256, ) optional.add_argument( diff --git a/zstash/globus.py b/zstash/globus.py index 3769fc7a..312aae2d 100644 --- a/zstash/globus.py +++ b/zstash/globus.py @@ -218,12 +218,15 @@ def globus_transfer( try: if task_id: task = transfer_client.get_task(task_id) + prev_task_status = task["status"] # one of {ACTIVE, SUCCEEDED, FAILED, CANCELED, PENDING, INACTIVE} - if task["status"] == "ACTIVE": - logger.info(f"{ts_utc()}: Globus task_id {task_id} Still Active") + # NOTE: How we behave here depends upon whether we want to support mutliple active transfers. + # Presently, we do not, except inadvertantly (if status == PENDING) + if prev_task_status == "ACTIVE": + logger.info(f"{ts_utc()}: Previous task_id {task_id} Still Active. Returning.") return "ACTIVE" - elif task["status"] == "SUCCEEDED": - logger.info(f"{ts_utc()}: Globus task_id {task_id} status = SUCCEEDED") + elif prev_task_status == "SUCCEEDED": + logger.info(f"{ts_utc()}: Previous task_id {task_id} status = SUCCEEDED. Continuing.") src_ep = task["source_endpoint_id"] dst_ep = task["destination_endpoint_id"] label = task["label"] @@ -234,7 +237,7 @@ def globus_transfer( ) ) else: - logger.error(f"{ts_utc()}: Transfer FAILED (task_id = {task_id})") + logger.error(f"{ts_utc()}: Previous task_id {task_id} status = {prev_task_status}. Continuing.") # DEBUG: review accumulated items in TransferData logger.info(f"{ts_utc()}: TransferData: accumulated items:") @@ -247,7 +250,9 @@ def globus_transfer( logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}") task = submit_transfer_with_checks(transfer_data) task_id = task.get("task_id") - logger.info(f"{ts_utc()}: SURFACE Submit Transfer returned task_id = {task_id} for label {transfer_data['label']}") + # NOTE: This log message is misleading. If we have accumulated multiple tar files for transfer, + # the "lable" given here refers only to the LAST tarfile in the TransferData list. + logger.info(f"{ts_utc()}: SURFACE Submit Transfer returned new task_id = {task_id} for label {transfer_data['label']}") transfer_data = None except TransferAPIError as e: @@ -264,35 +269,10 @@ def globus_transfer( logger.error("Exception: {}".format(e)) sys.exit(1) - # test for blocking + # test for blocking on new task_id task_status = "UNKNOWN" if not non_blocking: - logger.info(f"{ts_utc()}: BLOCKING START: invoking task_wait for task_id = {task_id}") - wait_timeout = 5 - max_retries = 3 - retry_count = 0 - while retry_count < max_retries: - try: - # Wait for the task to complete - transfer_client.task_wait(task_id, timeout=wait_timeout, polling_interval=1) - except GlobusHTTPError as e: - logger.error(f"Exception: {e}") - except Exception as e: - logger.error(f"Unexpected Exception: {e}") - else: - curr_task = transfer_client.get_task(task_id) - task_status = curr_task['status'] - if task_status == "SUCCEEDED": - break - finally: - retry_count += 1 - logger.info(f"{ts_utc()}: BLOCKING retry_count = {retry_count} of {max_retries} of timeout {wait_timeout} seconds") - - if retry_count == max_retries: - logger.info(f"{ts_utc()}: BLOCKING EXHAUSTED {max_retries} of timeout 5 seconds") - task_status = "EXHAUSTED_TIMEOUT_RETRIES" - - logger.info(f"{ts_utc()}: BLOCKING ENDS: task_id {task_id} returned from task_wait with status {task_status}") + task_status = globus_block_wait(task_id=task_id, wait_timeout=7200, polling_interval=10, max_retries=5) else: logger.info(f"{ts_utc()}: NO BLOCKING (task_wait) for task_id {task_id}") @@ -304,6 +284,40 @@ def globus_transfer( return task_status + +def globus_block_wait(task_id: str, wait_timeout: int, polling_interval: int, max_retries: int): + global transfer_client + + # poll every "polling_interval" seconds to speed up small transfers. Report every 2 hours, stop waiting aftert 5*2 = 10 hours + logger.info(f"{ts_utc()}: BLOCKING START: invoking task_wait for task_id = {task_id}") + task_status = "UNKNOWN" + retry_count = 0 + while retry_count < max_retries: + try: + # Wait for the task to complete + transfer_client.task_wait(task_id, timeout=wait_timeout, polling_interval=10) + except GlobusHTTPError as e: + logger.error(f"Exception: {e}") + except Exception as e: + logger.error(f"Unexpected Exception: {e}") + else: + curr_task = transfer_client.get_task(task_id) + task_status = curr_task['status'] + if task_status == "SUCCEEDED": + break + finally: + retry_count += 1 + logger.info(f"{ts_utc()}: BLOCKING retry_count = {retry_count} of {max_retries} of timeout {wait_timeout} seconds") + + if retry_count == max_retries: + logger.info(f"{ts_utc()}: BLOCKING EXHAUSTED {max_retries} of timeout {wait_timeout} seconds") + task_status = "EXHAUSTED_TIMEOUT_RETRIES" + + logger.info(f"{ts_utc()}: BLOCKING ENDS: task_id {task_id} returned from task_wait with status {task_status}") + + return task_status + + def globus_wait(task_id: str): global transfer_client diff --git a/zstash/hpss.py b/zstash/hpss.py index a800b21f..5d5486f4 100644 --- a/zstash/hpss.py +++ b/zstash/hpss.py @@ -91,7 +91,11 @@ def hpss_transfer( # Transfer file using the Globus Transfer Service logger.info(f"{ts_utc()}: DIVING: hpss calls globus_transfer(name={name})") globus_status = globus_transfer(endpoint, url_path, name, transfer_type, non_blocking) - logger.info(f"{ts_utc()}: SURFACE hpss globus_transfer(name={name}) returns") + logger.info(f"{ts_utc()}: SURFACE hpss globus_transfer(name={name}) returns {globus_status}") + # NOTE: Here, the status could be "TIMEOUT_RETRIES_EXHAUSTED", meaning a very long transfer + # or perhaps transfer is hanging. We should decide whether to ignore it, or cancel it, but + # we'd need the task_id to issue a cancellation. Perhaps we should have globus_transfer + # return a tuple (task_id, status). else: # Transfer file using `hsi` command: str = 'hsi -q "cd {}; {} {}"'.format(hpss, transfer_command, name) From d7bae50b8fd258acba37cea167671d89ee7b08db Mon Sep 17 00:00:00 2001 From: Anthony Bartoletti Date: Fri, 20 Dec 2024 13:15:57 -0600 Subject: [PATCH 07/12] datetime and timezone unneeded here (ts_utc in util), and corrected comment in hpss --- zstash/create.py | 1 - zstash/hpss.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/zstash/create.py b/zstash/create.py index 42a5f5fd..c1c46a5f 100644 --- a/zstash/create.py +++ b/zstash/create.py @@ -7,7 +7,6 @@ import sqlite3 import sys from typing import Any, List, Tuple -from datetime import datetime, timezone from six.moves.urllib.parse import urlparse diff --git a/zstash/hpss.py b/zstash/hpss.py index 5d5486f4..578efbe4 100644 --- a/zstash/hpss.py +++ b/zstash/hpss.py @@ -92,7 +92,7 @@ def hpss_transfer( logger.info(f"{ts_utc()}: DIVING: hpss calls globus_transfer(name={name})") globus_status = globus_transfer(endpoint, url_path, name, transfer_type, non_blocking) logger.info(f"{ts_utc()}: SURFACE hpss globus_transfer(name={name}) returns {globus_status}") - # NOTE: Here, the status could be "TIMEOUT_RETRIES_EXHAUSTED", meaning a very long transfer + # NOTE: Here, the status could be "EXHAUSTED_TIMEOUT_RETRIES", meaning a very long transfer # or perhaps transfer is hanging. We should decide whether to ignore it, or cancel it, but # we'd need the task_id to issue a cancellation. Perhaps we should have globus_transfer # return a tuple (task_id, status). From 633a5439001a031865519b4c63ba894d370560b7 Mon Sep 17 00:00:00 2001 From: Ryan Forsyth Date: Thu, 26 Dec 2024 15:44:44 -0800 Subject: [PATCH 08/12] Fix pre-commit checks and run tests --- zstash/create.py | 1 + zstash/globus.py | 63 ++++++++++++++++++++++++++++++-------------- zstash/hpss.py | 19 ++++++++++--- zstash/hpss_utils.py | 9 ++++--- zstash/utils.py | 5 ++-- 5 files changed, 68 insertions(+), 29 deletions(-) diff --git a/zstash/create.py b/zstash/create.py index c1c46a5f..d16287bb 100644 --- a/zstash/create.py +++ b/zstash/create.py @@ -22,6 +22,7 @@ ts_utc, ) + def create(): cache: str cache, args = setup_create() diff --git a/zstash/globus.py b/zstash/globus.py index 312aae2d..cca077d8 100644 --- a/zstash/globus.py +++ b/zstash/globus.py @@ -11,9 +11,9 @@ from globus_sdk import TransferAPIError, TransferClient, TransferData from globus_sdk.services.transfer.response.iterable import IterableTransferResponse from six.moves.urllib.parse import urlparse -from .utils import ts_utc from .settings import logger +from .utils import ts_utc hpss_endpoint_map = { "ALCF": "de463ec4-6d04-11e5-ba46-22000b92c6ec", @@ -158,9 +158,10 @@ def file_exists(name: str) -> bool: return False -def globus_transfer( +# C901 'globus_transfer' is too complex (20) +def globus_transfer( # noqa: C901 remote_ep: str, remote_path: str, name: str, transfer_type: str, non_blocking: bool -): # noqa: C901 +): global transfer_client global local_endpoint global remote_endpoint @@ -223,10 +224,14 @@ def globus_transfer( # NOTE: How we behave here depends upon whether we want to support mutliple active transfers. # Presently, we do not, except inadvertantly (if status == PENDING) if prev_task_status == "ACTIVE": - logger.info(f"{ts_utc()}: Previous task_id {task_id} Still Active. Returning.") + logger.info( + f"{ts_utc()}: Previous task_id {task_id} Still Active. Returning." + ) return "ACTIVE" elif prev_task_status == "SUCCEEDED": - logger.info(f"{ts_utc()}: Previous task_id {task_id} status = SUCCEEDED. Continuing.") + logger.info( + f"{ts_utc()}: Previous task_id {task_id} status = SUCCEEDED. Continuing." + ) src_ep = task["source_endpoint_id"] dst_ep = task["destination_endpoint_id"] label = task["label"] @@ -237,22 +242,26 @@ def globus_transfer( ) ) else: - logger.error(f"{ts_utc()}: Previous task_id {task_id} status = {prev_task_status}. Continuing.") + logger.error( + f"{ts_utc()}: Previous task_id {task_id} status = {prev_task_status}. Continuing." + ) # DEBUG: review accumulated items in TransferData logger.info(f"{ts_utc()}: TransferData: accumulated items:") attribs = transfer_data.__dict__ - for item in attribs['data']['DATA']: - if item['DATA_TYPE'] == "transfer_item": + for item in attribs["data"]["DATA"]: + if item["DATA_TYPE"] == "transfer_item": print(f" source item: {item['source_path']}") - + # SUBMIT new transfer here logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}") task = submit_transfer_with_checks(transfer_data) task_id = task.get("task_id") # NOTE: This log message is misleading. If we have accumulated multiple tar files for transfer, # the "lable" given here refers only to the LAST tarfile in the TransferData list. - logger.info(f"{ts_utc()}: SURFACE Submit Transfer returned new task_id = {task_id} for label {transfer_data['label']}") + logger.info( + f"{ts_utc()}: SURFACE Submit Transfer returned new task_id = {task_id} for label {transfer_data['label']}" + ) transfer_data = None except TransferAPIError as e: @@ -272,7 +281,9 @@ def globus_transfer( # test for blocking on new task_id task_status = "UNKNOWN" if not non_blocking: - task_status = globus_block_wait(task_id=task_id, wait_timeout=7200, polling_interval=10, max_retries=5) + task_status = globus_block_wait( + task_id=task_id, wait_timeout=7200, polling_interval=10, max_retries=5 + ) else: logger.info(f"{ts_utc()}: NO BLOCKING (task_wait) for task_id {task_id}") @@ -285,35 +296,47 @@ def globus_transfer( return task_status -def globus_block_wait(task_id: str, wait_timeout: int, polling_interval: int, max_retries: int): +def globus_block_wait( + task_id: str, wait_timeout: int, polling_interval: int, max_retries: int +): global transfer_client # poll every "polling_interval" seconds to speed up small transfers. Report every 2 hours, stop waiting aftert 5*2 = 10 hours - logger.info(f"{ts_utc()}: BLOCKING START: invoking task_wait for task_id = {task_id}") + logger.info( + f"{ts_utc()}: BLOCKING START: invoking task_wait for task_id = {task_id}" + ) task_status = "UNKNOWN" retry_count = 0 while retry_count < max_retries: try: # Wait for the task to complete - transfer_client.task_wait(task_id, timeout=wait_timeout, polling_interval=10) - except GlobusHTTPError as e: - logger.error(f"Exception: {e}") + transfer_client.task_wait( + task_id, timeout=wait_timeout, polling_interval=10 + ) + # except GlobusHTTPError as e: + # logger.error(f"Exception: {e}") except Exception as e: logger.error(f"Unexpected Exception: {e}") else: curr_task = transfer_client.get_task(task_id) - task_status = curr_task['status'] + task_status = curr_task["status"] if task_status == "SUCCEEDED": break finally: retry_count += 1 - logger.info(f"{ts_utc()}: BLOCKING retry_count = {retry_count} of {max_retries} of timeout {wait_timeout} seconds") + logger.info( + f"{ts_utc()}: BLOCKING retry_count = {retry_count} of {max_retries} of timeout {wait_timeout} seconds" + ) if retry_count == max_retries: - logger.info(f"{ts_utc()}: BLOCKING EXHAUSTED {max_retries} of timeout {wait_timeout} seconds") + logger.info( + f"{ts_utc()}: BLOCKING EXHAUSTED {max_retries} of timeout {wait_timeout} seconds" + ) task_status = "EXHAUSTED_TIMEOUT_RETRIES" - logger.info(f"{ts_utc()}: BLOCKING ENDS: task_id {task_id} returned from task_wait with status {task_status}") + logger.info( + f"{ts_utc()}: BLOCKING ENDS: task_id {task_id} returned from task_wait with status {task_status}" + ) return task_status diff --git a/zstash/hpss.py b/zstash/hpss.py index 578efbe4..e474e8b0 100644 --- a/zstash/hpss.py +++ b/zstash/hpss.py @@ -90,8 +90,12 @@ def hpss_transfer( globus_status = "UNKNOWN" # Transfer file using the Globus Transfer Service logger.info(f"{ts_utc()}: DIVING: hpss calls globus_transfer(name={name})") - globus_status = globus_transfer(endpoint, url_path, name, transfer_type, non_blocking) - logger.info(f"{ts_utc()}: SURFACE hpss globus_transfer(name={name}) returns {globus_status}") + globus_status = globus_transfer( + endpoint, url_path, name, transfer_type, non_blocking + ) + logger.info( + f"{ts_utc()}: SURFACE hpss globus_transfer(name={name}) returns {globus_status}" + ) # NOTE: Here, the status could be "EXHAUSTED_TIMEOUT_RETRIES", meaning a very long transfer # or perhaps transfer is hanging. We should decide whether to ignore it, or cancel it, but # we'd need the task_id to issue a cancellation. Perhaps we should have globus_transfer @@ -107,7 +111,12 @@ def hpss_transfer( os.chdir(cwd) if transfer_type == "put": - if not keep and scheme == "globus" and globus_status == "SUCCEEDED" and not non_blocking: + if ( + not keep + and scheme == "globus" + and globus_status == "SUCCEEDED" + and not non_blocking + ): # We should not keep the local file, so delete it now that it is on HPSS os.remove(file_path) if not keep and scheme != "globus": @@ -115,7 +124,9 @@ def hpss_transfer( os.remove(file_path) -def hpss_put(hpss: str, file_path: str, cache: str, keep: bool = True, non_blocking: bool = False): +def hpss_put( + hpss: str, file_path: str, cache: str, keep: bool = True, non_blocking: bool = False +): """ Put a file to the HPSS archive. """ diff --git a/zstash/hpss_utils.py b/zstash/hpss_utils.py index bde2e4ce..707f73a8 100644 --- a/zstash/hpss_utils.py +++ b/zstash/hpss_utils.py @@ -16,7 +16,6 @@ from .settings import BLOCK_SIZE, TupleFilesRowNoId, TupleTarsRowNoId, config, logger from .utils import create_tars_table, tars_table_exists, ts_utc -import subprocess # Minimum output file object class HashIO(object): @@ -165,9 +164,13 @@ def add_files( # process = subprocess.run(["ls", "-l", "zstash"], capture_output=True, text=True) # print(process.stdout) - logger.info(f"{ts_utc()}: DIVING: (add_files): Calling hpss_put to dispatch archive file {tfname}") + logger.info( + f"{ts_utc()}: DIVING: (add_files): Calling hpss_put to dispatch archive file {tfname}" + ) hpss_put(hpss, os.path.join(cache, tfname), cache, keep, non_blocking) - logger.info(f"{ts_utc()}: SURFACE (add_files): Called hpss_put to dispatch archive file {tfname}") + logger.info( + f"{ts_utc()}: SURFACE (add_files): Called hpss_put to dispatch archive file {tfname}" + ) # Update database with files that have been archived # Add a row to the "files" table, diff --git a/zstash/utils.py b/zstash/utils.py index 39915cfc..ea793603 100644 --- a/zstash/utils.py +++ b/zstash/utils.py @@ -4,16 +4,17 @@ import shlex import sqlite3 import subprocess +from datetime import datetime, timezone from fnmatch import fnmatch from typing import Any, List, Tuple -from datetime import datetime, timezone - from .settings import TupleTarsRow, config, logger + def ts_utc(): return datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S_%f") + def filter_files(subset: str, files: List[str], include: bool) -> List[str]: # Construct list of files to filter, based on From 5738ff2d6d7d6665fc3ce8486cbba7494d2edaaa Mon Sep 17 00:00:00 2001 From: Anthony Bartoletti Date: Thu, 2 Jan 2025 13:28:04 -0600 Subject: [PATCH 09/12] minor cleanup --- zstash/globus.py | 2 -- zstash/hpss.py | 7 ++----- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/zstash/globus.py b/zstash/globus.py index 312aae2d..f0e25ede 100644 --- a/zstash/globus.py +++ b/zstash/globus.py @@ -296,8 +296,6 @@ def globus_block_wait(task_id: str, wait_timeout: int, polling_interval: int, ma try: # Wait for the task to complete transfer_client.task_wait(task_id, timeout=wait_timeout, polling_interval=10) - except GlobusHTTPError as e: - logger.error(f"Exception: {e}") except Exception as e: logger.error(f"Unexpected Exception: {e}") else: diff --git a/zstash/hpss.py b/zstash/hpss.py index 578efbe4..8388d15d 100644 --- a/zstash/hpss.py +++ b/zstash/hpss.py @@ -107,11 +107,8 @@ def hpss_transfer( os.chdir(cwd) if transfer_type == "put": - if not keep and scheme == "globus" and globus_status == "SUCCEEDED" and not non_blocking: - # We should not keep the local file, so delete it now that it is on HPSS - os.remove(file_path) - if not keep and scheme != "globus": - # We should not keep the local file, so delete it now that it is on HPSS + if not keep: + if (scheme != "globus") or (globus_status == "SUCCEEDED" and not non_blocking): os.remove(file_path) From 7ed1f92024cfee3214788c5c628d90790af13e5e Mon Sep 17 00:00:00 2001 From: Ryan Forsyth Date: Tue, 7 Jan 2025 08:00:55 -0800 Subject: [PATCH 10/12] Unit tests passing --- tests/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/base.py b/tests/base.py index a992dd81..de88ee4b 100644 --- a/tests/base.py +++ b/tests/base.py @@ -325,7 +325,7 @@ def add_files(self, use_hpss, zstash_path, keep=False, cache=None): expected_present = ["Transferring file to HPSS"] else: expected_present = ["put: HPSS is unavailable"] - expected_present += ["INFO: Creating new tar archive"] + expected_present += ["Creating new tar archive"] # Make sure none of the old files or directories are moved. expected_absent = ["ERROR", "file0", "file_empty", "empty_dir"] self.check_strings(cmd, output + err, expected_present, expected_absent) From c9ea93737b0019c1e679344199dd16d3b6f3adff Mon Sep 17 00:00:00 2001 From: Ryan Forsyth Date: Tue, 7 Jan 2025 08:08:30 -0800 Subject: [PATCH 11/12] Fix pre-commit error --- zstash/hpss.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/zstash/hpss.py b/zstash/hpss.py index 92731b04..44bc0f13 100644 --- a/zstash/hpss.py +++ b/zstash/hpss.py @@ -112,7 +112,9 @@ def hpss_transfer( if transfer_type == "put": if not keep: - if (scheme != "globus") or (globus_status == "SUCCEEDED" and not non_blocking): + if (scheme != "globus") or ( + globus_status == "SUCCEEDED" and not non_blocking + ): os.remove(file_path) From 4a2a8df3055d3dc8cce6b266c4b490d363f95a7b Mon Sep 17 00:00:00 2001 From: Anthony Bartoletti Date: Wed, 8 Jan 2025 13:06:42 -0600 Subject: [PATCH 12/12] added tests3 scripts and README for blocking tests --- tests3/README_TEST_BLOCKING | 103 +++++++++++++++++++++++++++++++++ tests3/gen_data.sh | 11 ++++ tests3/gen_data_runner.sh | 8 +++ tests3/reset_test.sh | 5 ++ tests3/snapshot.sh | 14 +++++ tests3/test_zstash_blocking.sh | 73 +++++++++++++++++++++++ 6 files changed, 214 insertions(+) create mode 100644 tests3/README_TEST_BLOCKING create mode 100755 tests3/gen_data.sh create mode 100755 tests3/gen_data_runner.sh create mode 100755 tests3/reset_test.sh create mode 100755 tests3/snapshot.sh create mode 100755 tests3/test_zstash_blocking.sh diff --git a/tests3/README_TEST_BLOCKING b/tests3/README_TEST_BLOCKING new file mode 100644 index 00000000..0a0fb6fe --- /dev/null +++ b/tests3/README_TEST_BLOCKING @@ -0,0 +1,103 @@ + +This document outlines the procedures conducted to test the zstash bloclking +and non-blocking behavior. + +Note: As it was intended to test blocking with regard to archive tar-creations +vs Globus transfers, it wsa convenient to have both source snd destination be +the same Globus endpoint. Effectively, we are employing Globus merely to move +tar archive files from one directory to another on the same file system. + +The core intent in implementing zstash blocking is to address a potential +"low-disk" condition, where tar-files created to archive source files could +add substantially to the disk load. To avoid disk exhaustion, "blocking" +("--non-blocking" is absent on the command line), tar-file creation will +pause to wait for the previous tarfile globus transfer to complete, so that +the local copy can be deleted before the next tar-file is created. + +I. File System Setup +==================== + +s one may want, or need to re-conduct testing under varied conditions, the +test script: + + test_zstash_blocking.sh + +will establish the following directory structure in the operator's current +working directory: + + [CWD]/src_data/ + + - contains files to be tar-archived. One can experiment + with different sizes of files to trigger behaviors. + + [CWD]/src_data/zstash/ + + - default location of tarfiles produced. This directory is + created automatically by zstash unless "--cache" indicates + an alternate location. + + [CWD]/dst_data/ + + - destination for Globus transfer of archives. + + [CWD]/tmp_cache/ + + - [Optional] alternative location for tar-file generation. + +Note: It may be convenient to create a "hold" directory to store files of +various sizes that can be easily produced by running the supplied scripts. + + gen_data.sh + gen_data_runner.sh + +The files to be used for a given test must be moved or copied to the src_data +directory before a test is initiated. + +Note: It never hurts to run the supplied script: + + reset_test.sh + +before a test run. This will delete any archives in the src_data/zstash +cache and the receiving dst_data directories, and delete the src_data/zstash +directory itself if it exists. This ensures a clean restart for testing. +The rad data files placed into src_data are not affected. + +II. Running the Test Script +=========================== + +The test script "test_zstash_blocking.sh" accepts two positional parameters: + + test_zstash_blocking.sh (BLOCKING|NON_BLOCKING) [NEW_CREDS] + +On an initial run, or whenever Globus complains of authentication failures, +add "NEW_CREDS" as the second parameter. This will act to delete your +cached Globus credentials and trigger prompts for you to paste login URLs +to your browser (generally one per endpoint) which requires that you conduct +a login sequence, and then paste a returned key-value at the bash command +prompt. After both keys are accepted, you can re-run the test script +without "NEW_CREDS", until the credentials expire (usually 24 hours.) + +If "BLOCKING" is selected, zstash will run in default mode, waiting for +each tar file to complete transfer before generating another tar file. + +If "NON_BLOCKING" is selected, the zstash flag "--non-blocking" is supplied +to the zstash command line, and tar files continue to be created in parallel +to running Globus transfers. + +It is suggested that you reun the test script with + + test_zstash_blocking.sh (BLOCKING|NON_BLOCKING) > your_logfile 2>&1 + +so that your command prompt returns and you can monitor progress with + + snapshot.sh + +which will provide a view of both the tarfile cache and the destination +directory for delivred tar files. It is also suugested that you name your +logfile to reflect the date, and whether BLOCKING or not. + + +FINAL NOTE: In the zstash code, the tar file "MINSIZE" parameter is taken +to be (int) multiples of 1 GB. During testing, this had been changed to +"multiple of 100K" for rapid testing. It may be useful to expose this as +a command line parameter for debugging purposes. diff --git a/tests3/gen_data.sh b/tests3/gen_data.sh new file mode 100755 index 00000000..87754a8c --- /dev/null +++ b/tests3/gen_data.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +if [[ $# -lt 2 ]]; then + echo "Usage: gen_data.sh " + exit 0 +fi + +len=$1 +out=$2 + +head -c $len $out diff --git a/tests3/gen_data_runner.sh b/tests3/gen_data_runner.sh new file mode 100755 index 00000000..fec73a2f --- /dev/null +++ b/tests3/gen_data_runner.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +i=1 + +while [[ $i -lt 12 ]]; do + ./gen_data.sh 1000000 small_0${i}_1M + i=$((i+1)) +done diff --git a/tests3/reset_test.sh b/tests3/reset_test.sh new file mode 100755 index 00000000..a0d960c7 --- /dev/null +++ b/tests3/reset_test.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +rm -rf src_data/zstash/ +rm -f dst_data/* +rm -f tmp_cache/* diff --git a/tests3/snapshot.sh b/tests3/snapshot.sh new file mode 100755 index 00000000..869812dc --- /dev/null +++ b/tests3/snapshot.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +echo "dst_data:" +ls -l dst_data + +echo "" +echo "src_data/zstash:" +ls -l src_data/zstash + +echo "" +echo "tmp_cache:" +ls -l tmp_cache + + diff --git a/tests3/test_zstash_blocking.sh b/tests3/test_zstash_blocking.sh new file mode 100755 index 00000000..2d58622d --- /dev/null +++ b/tests3/test_zstash_blocking.sh @@ -0,0 +1,73 @@ +#!/bin/bash + +if [[ $# -lt 1 ]]; then + echo "Usage: text_zstash_blocking.sh (BLOCKING|NON_BLOCKING) [NEW_CREDS]" + echo " One of \"BLOCKING\" or \"NON_BLOCKING\" must be supplied as the" + echo " first parameter." + echo " Add \"NEW_CREDS\" if Globus credentials have expired." + echo " This will cause Globus to prompt for new credentials." + exit 0 +fi + +NON_BLOCKING=1 + +if [[ $1 == "BLOCKING" ]]; then + NON_BLOCKING=0 +elif [[ $1 == "NON_BLOCKING" ]]; then + NON_BLOCKING=1 +else + echo "ERROR: Must supply \"BLOCKING\" or \"NON_BLOCKING\" as 1st argument." + exit 0 +fi + +# remove old auth data, if exists, so that globus will prompt us +# for new auth credentials in case they have expired: +if [[ $# -gt 1 ]]; then + if [[ $2 == "NEW_CREDS" ]]: then + rm -f ~/.globus-native-apps.cfg + fi +fi + + +base_dir=`pwd` +base_dir=`realpath $base_dir` + + +# See if we are running the zstash we THINK we are: +echo "CALLING zstash version" +zstash version +echo "" + +# Selectable Endpoint UUIDs +ACME1_GCSv5_UUID=6edb802e-2083-47f7-8f1c-20950841e46a +LCRC_IMPROV_DTN_UUID=15288284-7006-4041-ba1a-6b52501e49f1 +NERSC_HPSS_UUID=9cd89cfd-6d04-11e5-ba46-22000b92c6ec + +# 12 piControl ocean monthly files, 49 GB +SRC_DATA=$base_dir/src_data +DST_DATA=$base_dir/dst_data + +SRC_UUID=$LCRC_IMPROV_DTN_UUID +DST_UUID=$LCRC_IMPROV_DTN_UUID + +# Optional +TMP_CACHE=$base_dir/tmp_cache + +mkdir -p $SRC_DATA $DST_DATA $TMP_CACHE + +# Make maxsize 1 GB. This will create a new tar after every 1 GB of data. +# (Since individual files are 4 GB, we will get 1 tarfile per datafile.) + +if [[ $NON_BLOCKING -eq 1 ]]; then + echo "TEST: NON_BLOCKING:" + zstash create -v --hpss=globus://$DST_UUID/$DST_DATA --maxsize 1 --non-blocking $SRC_DATA +else + echo "TEST: BLOCKING:" + zstash create -v --hpss=globus://$DST_UUID/$DST_DATA --maxsize 1 $SRC_DATA + # zstash create -v --hpss=globus://$DST_UUID --maxsize 1 --non-blocking --cache $TMP_CACHE $SRC_DATA +fi + +echo "Testing Completed" + +exit 0 +