diff --git a/tests/base.py b/tests/base.py index a992dd81..de88ee4b 100644 --- a/tests/base.py +++ b/tests/base.py @@ -325,7 +325,7 @@ def add_files(self, use_hpss, zstash_path, keep=False, cache=None): expected_present = ["Transferring file to HPSS"] else: expected_present = ["put: HPSS is unavailable"] - expected_present += ["INFO: Creating new tar archive"] + expected_present += ["Creating new tar archive"] # Make sure none of the old files or directories are moved. expected_absent = ["ERROR", "file0", "file_empty", "empty_dir"] self.check_strings(cmd, output + err, expected_present, expected_absent) diff --git a/tests3/README_TEST_BLOCKING b/tests3/README_TEST_BLOCKING new file mode 100644 index 00000000..0a0fb6fe --- /dev/null +++ b/tests3/README_TEST_BLOCKING @@ -0,0 +1,103 @@ + +This document outlines the procedures conducted to test the zstash bloclking +and non-blocking behavior. + +Note: As it was intended to test blocking with regard to archive tar-creations +vs Globus transfers, it wsa convenient to have both source snd destination be +the same Globus endpoint. Effectively, we are employing Globus merely to move +tar archive files from one directory to another on the same file system. + +The core intent in implementing zstash blocking is to address a potential +"low-disk" condition, where tar-files created to archive source files could +add substantially to the disk load. To avoid disk exhaustion, "blocking" +("--non-blocking" is absent on the command line), tar-file creation will +pause to wait for the previous tarfile globus transfer to complete, so that +the local copy can be deleted before the next tar-file is created. + +I. File System Setup +==================== + +s one may want, or need to re-conduct testing under varied conditions, the +test script: + + test_zstash_blocking.sh + +will establish the following directory structure in the operator's current +working directory: + + [CWD]/src_data/ + + - contains files to be tar-archived. One can experiment + with different sizes of files to trigger behaviors. + + [CWD]/src_data/zstash/ + + - default location of tarfiles produced. This directory is + created automatically by zstash unless "--cache" indicates + an alternate location. + + [CWD]/dst_data/ + + - destination for Globus transfer of archives. + + [CWD]/tmp_cache/ + + - [Optional] alternative location for tar-file generation. + +Note: It may be convenient to create a "hold" directory to store files of +various sizes that can be easily produced by running the supplied scripts. + + gen_data.sh + gen_data_runner.sh + +The files to be used for a given test must be moved or copied to the src_data +directory before a test is initiated. + +Note: It never hurts to run the supplied script: + + reset_test.sh + +before a test run. This will delete any archives in the src_data/zstash +cache and the receiving dst_data directories, and delete the src_data/zstash +directory itself if it exists. This ensures a clean restart for testing. +The rad data files placed into src_data are not affected. + +II. Running the Test Script +=========================== + +The test script "test_zstash_blocking.sh" accepts two positional parameters: + + test_zstash_blocking.sh (BLOCKING|NON_BLOCKING) [NEW_CREDS] + +On an initial run, or whenever Globus complains of authentication failures, +add "NEW_CREDS" as the second parameter. This will act to delete your +cached Globus credentials and trigger prompts for you to paste login URLs +to your browser (generally one per endpoint) which requires that you conduct +a login sequence, and then paste a returned key-value at the bash command +prompt. After both keys are accepted, you can re-run the test script +without "NEW_CREDS", until the credentials expire (usually 24 hours.) + +If "BLOCKING" is selected, zstash will run in default mode, waiting for +each tar file to complete transfer before generating another tar file. + +If "NON_BLOCKING" is selected, the zstash flag "--non-blocking" is supplied +to the zstash command line, and tar files continue to be created in parallel +to running Globus transfers. + +It is suggested that you reun the test script with + + test_zstash_blocking.sh (BLOCKING|NON_BLOCKING) > your_logfile 2>&1 + +so that your command prompt returns and you can monitor progress with + + snapshot.sh + +which will provide a view of both the tarfile cache and the destination +directory for delivred tar files. It is also suugested that you name your +logfile to reflect the date, and whether BLOCKING or not. + + +FINAL NOTE: In the zstash code, the tar file "MINSIZE" parameter is taken +to be (int) multiples of 1 GB. During testing, this had been changed to +"multiple of 100K" for rapid testing. It may be useful to expose this as +a command line parameter for debugging purposes. diff --git a/tests3/gen_data.sh b/tests3/gen_data.sh new file mode 100755 index 00000000..87754a8c --- /dev/null +++ b/tests3/gen_data.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +if [[ $# -lt 2 ]]; then + echo "Usage: gen_data.sh " + exit 0 +fi + +len=$1 +out=$2 + +head -c $len $out diff --git a/tests3/gen_data_runner.sh b/tests3/gen_data_runner.sh new file mode 100755 index 00000000..fec73a2f --- /dev/null +++ b/tests3/gen_data_runner.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +i=1 + +while [[ $i -lt 12 ]]; do + ./gen_data.sh 1000000 small_0${i}_1M + i=$((i+1)) +done diff --git a/tests3/reset_test.sh b/tests3/reset_test.sh new file mode 100755 index 00000000..a0d960c7 --- /dev/null +++ b/tests3/reset_test.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +rm -rf src_data/zstash/ +rm -f dst_data/* +rm -f tmp_cache/* diff --git a/tests3/snapshot.sh b/tests3/snapshot.sh new file mode 100755 index 00000000..869812dc --- /dev/null +++ b/tests3/snapshot.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +echo "dst_data:" +ls -l dst_data + +echo "" +echo "src_data/zstash:" +ls -l src_data/zstash + +echo "" +echo "tmp_cache:" +ls -l tmp_cache + + diff --git a/tests3/test_zstash_blocking.sh b/tests3/test_zstash_blocking.sh new file mode 100755 index 00000000..2d58622d --- /dev/null +++ b/tests3/test_zstash_blocking.sh @@ -0,0 +1,73 @@ +#!/bin/bash + +if [[ $# -lt 1 ]]; then + echo "Usage: text_zstash_blocking.sh (BLOCKING|NON_BLOCKING) [NEW_CREDS]" + echo " One of \"BLOCKING\" or \"NON_BLOCKING\" must be supplied as the" + echo " first parameter." + echo " Add \"NEW_CREDS\" if Globus credentials have expired." + echo " This will cause Globus to prompt for new credentials." + exit 0 +fi + +NON_BLOCKING=1 + +if [[ $1 == "BLOCKING" ]]; then + NON_BLOCKING=0 +elif [[ $1 == "NON_BLOCKING" ]]; then + NON_BLOCKING=1 +else + echo "ERROR: Must supply \"BLOCKING\" or \"NON_BLOCKING\" as 1st argument." + exit 0 +fi + +# remove old auth data, if exists, so that globus will prompt us +# for new auth credentials in case they have expired: +if [[ $# -gt 1 ]]; then + if [[ $2 == "NEW_CREDS" ]]: then + rm -f ~/.globus-native-apps.cfg + fi +fi + + +base_dir=`pwd` +base_dir=`realpath $base_dir` + + +# See if we are running the zstash we THINK we are: +echo "CALLING zstash version" +zstash version +echo "" + +# Selectable Endpoint UUIDs +ACME1_GCSv5_UUID=6edb802e-2083-47f7-8f1c-20950841e46a +LCRC_IMPROV_DTN_UUID=15288284-7006-4041-ba1a-6b52501e49f1 +NERSC_HPSS_UUID=9cd89cfd-6d04-11e5-ba46-22000b92c6ec + +# 12 piControl ocean monthly files, 49 GB +SRC_DATA=$base_dir/src_data +DST_DATA=$base_dir/dst_data + +SRC_UUID=$LCRC_IMPROV_DTN_UUID +DST_UUID=$LCRC_IMPROV_DTN_UUID + +# Optional +TMP_CACHE=$base_dir/tmp_cache + +mkdir -p $SRC_DATA $DST_DATA $TMP_CACHE + +# Make maxsize 1 GB. This will create a new tar after every 1 GB of data. +# (Since individual files are 4 GB, we will get 1 tarfile per datafile.) + +if [[ $NON_BLOCKING -eq 1 ]]; then + echo "TEST: NON_BLOCKING:" + zstash create -v --hpss=globus://$DST_UUID/$DST_DATA --maxsize 1 --non-blocking $SRC_DATA +else + echo "TEST: BLOCKING:" + zstash create -v --hpss=globus://$DST_UUID/$DST_DATA --maxsize 1 $SRC_DATA + # zstash create -v --hpss=globus://$DST_UUID --maxsize 1 --non-blocking --cache $TMP_CACHE $SRC_DATA +fi + +echo "Testing Completed" + +exit 0 + diff --git a/zstash/create.py b/zstash/create.py index e8819278..d16287bb 100644 --- a/zstash/create.py +++ b/zstash/create.py @@ -19,6 +19,7 @@ get_files_to_archive, run_command, tars_table_exists, + ts_utc, ) @@ -37,7 +38,7 @@ def create(): raise TypeError("Invalid config.hpss={}".format(config.hpss)) # Start doing actual work - logger.debug("Running zstash create") + logger.debug(f"{ts_utc()}: Running zstash create") logger.debug("Local path : {}".format(path)) logger.debug("HPSS path : {}".format(hpss)) logger.debug("Max size : {}".format(config.maxsize)) @@ -54,11 +55,13 @@ def create(): if hpss != "none": url = urlparse(hpss) if url.scheme == "globus": + # identify globus endpoints + logger.debug(f"{ts_utc()}:Calling globus_activate(hpss)") globus_activate(hpss) else: # config.hpss is not "none", so we need to # create target HPSS directory - logger.debug("Creating target HPSS directory") + logger.debug(f"{ts_utc()}: Creating target HPSS directory {hpss}") mkdir_command: str = "hsi -q mkdir -p {}".format(hpss) mkdir_error_str: str = "Could not create HPSS directory: {}".format(hpss) run_command(mkdir_command, mkdir_error_str) @@ -71,7 +74,7 @@ def create(): run_command(ls_command, ls_error_str) # Create cache directory - logger.debug("Creating local cache directory") + logger.debug(f"{ts_utc()}: Creating local cache directory") os.chdir(path) try: os.makedirs(cache) @@ -84,11 +87,14 @@ def create(): # TODO: Verify that cache is empty # Create and set up the database + logger.debug(f"{ts_utc()}: Calling create_database()") failures: List[str] = create_database(cache, args) # Transfer to HPSS. Always keep a local copy. + logger.debug(f"{ts_utc()}: calling hpss_put() for {get_db_filename(cache)}") hpss_put(hpss, get_db_filename(cache), cache, keep=True) + logger.debug(f"{ts_utc()}: calling globus_finalize()") globus_finalize(non_blocking=args.non_blocking) if len(failures) > 0: @@ -145,7 +151,7 @@ def setup_create() -> Tuple[str, argparse.Namespace]: optional.add_argument( "--non-blocking", action="store_true", - help="do not wait for each Globus transfer until it completes.", + help="do not wait for each Globus transfer to complete before creating additional archive files. This option will use more intermediate disk-space, but can increase throughput.", ) optional.add_argument( "-v", "--verbose", action="store_true", help="increase output verbosity" @@ -185,7 +191,7 @@ def setup_create() -> Tuple[str, argparse.Namespace]: def create_database(cache: str, args: argparse.Namespace) -> List[str]: # Create new database - logger.debug("Creating index database") + logger.debug(f"{ts_utc()}:Creating index database") if os.path.exists(get_db_filename(cache)): # Remove old database os.remove(get_db_filename(cache)) @@ -254,6 +260,7 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]: args.keep, args.follow_symlinks, skip_tars_md5=args.no_tars_md5, + non_blocking=args.non_blocking, ) except FileNotFoundError: raise Exception("Archive creation failed due to broken symlink.") @@ -268,6 +275,7 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]: args.keep, args.follow_symlinks, skip_tars_md5=args.no_tars_md5, + non_blocking=args.non_blocking, ) # Close database diff --git a/zstash/globus.py b/zstash/globus.py index 964baf9b..73a39fa0 100644 --- a/zstash/globus.py +++ b/zstash/globus.py @@ -13,6 +13,7 @@ from six.moves.urllib.parse import urlparse from .settings import logger +from .utils import ts_utc hpss_endpoint_map = { "ALCF": "de463ec4-6d04-11e5-ba46-22000b92c6ec", @@ -157,9 +158,10 @@ def file_exists(name: str) -> bool: return False -def globus_transfer( - remote_ep: str, remote_path: str, name: str, transfer_type: str -): # noqa: C901 +# C901 'globus_transfer' is too complex (20) +def globus_transfer( # noqa: C901 + remote_ep: str, remote_path: str, name: str, transfer_type: str, non_blocking: bool +): global transfer_client global local_endpoint global remote_endpoint @@ -167,6 +169,7 @@ def globus_transfer( global task_id global archive_directory_listing + logger.info(f"{ts_utc()}: Entered globus_transfer() for name = {name}") if not transfer_client: globus_activate("globus://" + remote_ep) if not transfer_client: @@ -216,21 +219,50 @@ def globus_transfer( try: if task_id: task = transfer_client.get_task(task_id) - if task["status"] == "ACTIVE": - return - elif task["status"] == "SUCCEEDED": + prev_task_status = task["status"] + # one of {ACTIVE, SUCCEEDED, FAILED, CANCELED, PENDING, INACTIVE} + # NOTE: How we behave here depends upon whether we want to support mutliple active transfers. + # Presently, we do not, except inadvertantly (if status == PENDING) + if prev_task_status == "ACTIVE": + logger.info( + f"{ts_utc()}: Previous task_id {task_id} Still Active. Returning." + ) + return "ACTIVE" + elif prev_task_status == "SUCCEEDED": + logger.info( + f"{ts_utc()}: Previous task_id {task_id} status = SUCCEEDED. Continuing." + ) src_ep = task["source_endpoint_id"] dst_ep = task["destination_endpoint_id"] label = task["label"] + ts = ts_utc() logger.info( - "Globus transfer {}, from {} to {}: {} succeeded".format( - task_id, src_ep, dst_ep, label + "{}:Globus transfer {}, from {} to {}: {} succeeded".format( + ts, task_id, src_ep, dst_ep, label ) ) else: - logger.error("Transfer FAILED") + logger.error( + f"{ts_utc()}: Previous task_id {task_id} status = {prev_task_status}. Continuing." + ) + + # DEBUG: review accumulated items in TransferData + logger.info(f"{ts_utc()}: TransferData: accumulated items:") + attribs = transfer_data.__dict__ + for item in attribs["data"]["DATA"]: + if item["DATA_TYPE"] == "transfer_item": + print(f" source item: {item['source_path']}") + + # SUBMIT new transfer here + logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}") task = submit_transfer_with_checks(transfer_data) task_id = task.get("task_id") + # NOTE: This log message is misleading. If we have accumulated multiple tar files for transfer, + # the "lable" given here refers only to the LAST tarfile in the TransferData list. + logger.info( + f"{ts_utc()}: SURFACE Submit Transfer returned new task_id = {task_id} for label {transfer_data['label']}" + ) + transfer_data = None except TransferAPIError as e: if e.code == "NoCredException": @@ -246,9 +278,66 @@ def globus_transfer( logger.error("Exception: {}".format(e)) sys.exit(1) + # test for blocking on new task_id + task_status = "UNKNOWN" + if not non_blocking: + task_status = globus_block_wait( + task_id=task_id, wait_timeout=7200, polling_interval=10, max_retries=5 + ) + else: + logger.info(f"{ts_utc()}: NO BLOCKING (task_wait) for task_id {task_id}") + + if transfer_type == "put": + return task_status + if transfer_type == "get" and task_id: globus_wait(task_id) + return task_status + + +def globus_block_wait( + task_id: str, wait_timeout: int, polling_interval: int, max_retries: int +): + global transfer_client + + # poll every "polling_interval" seconds to speed up small transfers. Report every 2 hours, stop waiting aftert 5*2 = 10 hours + logger.info( + f"{ts_utc()}: BLOCKING START: invoking task_wait for task_id = {task_id}" + ) + task_status = "UNKNOWN" + retry_count = 0 + while retry_count < max_retries: + try: + # Wait for the task to complete + transfer_client.task_wait( + task_id, timeout=wait_timeout, polling_interval=10 + ) + except Exception as e: + logger.error(f"Unexpected Exception: {e}") + else: + curr_task = transfer_client.get_task(task_id) + task_status = curr_task["status"] + if task_status == "SUCCEEDED": + break + finally: + retry_count += 1 + logger.info( + f"{ts_utc()}: BLOCKING retry_count = {retry_count} of {max_retries} of timeout {wait_timeout} seconds" + ) + + if retry_count == max_retries: + logger.info( + f"{ts_utc()}: BLOCKING EXHAUSTED {max_retries} of timeout {wait_timeout} seconds" + ) + task_status = "EXHAUSTED_TIMEOUT_RETRIES" + + logger.info( + f"{ts_utc()}: BLOCKING ENDS: task_id {task_id} returned from task_wait with status {task_status}" + ) + + return task_status + def globus_wait(task_id: str): global transfer_client diff --git a/zstash/hpss.py b/zstash/hpss.py index a055fe99..44bc0f13 100644 --- a/zstash/hpss.py +++ b/zstash/hpss.py @@ -8,7 +8,7 @@ from .globus import globus_transfer from .settings import get_db_filename, logger -from .utils import run_command +from .utils import run_command, ts_utc def hpss_transfer( @@ -17,6 +17,7 @@ def hpss_transfer( transfer_type: str, cache: str, keep: bool = False, + non_blocking: bool = False, ): if hpss == "none": logger.info("{}: HPSS is unavailable".format(transfer_type)) @@ -86,8 +87,19 @@ def hpss_transfer( os.chdir(path) if scheme == "globus": + globus_status = "UNKNOWN" # Transfer file using the Globus Transfer Service - globus_transfer(endpoint, url_path, name, transfer_type) + logger.info(f"{ts_utc()}: DIVING: hpss calls globus_transfer(name={name})") + globus_status = globus_transfer( + endpoint, url_path, name, transfer_type, non_blocking + ) + logger.info( + f"{ts_utc()}: SURFACE hpss globus_transfer(name={name}) returns {globus_status}" + ) + # NOTE: Here, the status could be "EXHAUSTED_TIMEOUT_RETRIES", meaning a very long transfer + # or perhaps transfer is hanging. We should decide whether to ignore it, or cancel it, but + # we'd need the task_id to issue a cancellation. Perhaps we should have globus_transfer + # return a tuple (task_id, status). else: # Transfer file using `hsi` command: str = 'hsi -q "cd {}; {} {}"'.format(hpss, transfer_command, name) @@ -99,16 +111,20 @@ def hpss_transfer( os.chdir(cwd) if transfer_type == "put": - if not keep and scheme != "globus": - # We should not keep the local file, so delete it now that it is on HPSS - os.remove(file_path) + if not keep: + if (scheme != "globus") or ( + globus_status == "SUCCEEDED" and not non_blocking + ): + os.remove(file_path) -def hpss_put(hpss: str, file_path: str, cache: str, keep: bool = True): +def hpss_put( + hpss: str, file_path: str, cache: str, keep: bool = True, non_blocking: bool = False +): """ Put a file to the HPSS archive. """ - hpss_transfer(hpss, file_path, "put", cache, keep) + hpss_transfer(hpss, file_path, "put", cache, keep, non_blocking) def hpss_get(hpss: str, file_path: str, cache: str): diff --git a/zstash/hpss_utils.py b/zstash/hpss_utils.py index 456e6a52..707f73a8 100644 --- a/zstash/hpss_utils.py +++ b/zstash/hpss_utils.py @@ -14,7 +14,7 @@ from .hpss import hpss_put from .settings import BLOCK_SIZE, TupleFilesRowNoId, TupleTarsRowNoId, config, logger -from .utils import create_tars_table, tars_table_exists +from .utils import create_tars_table, tars_table_exists, ts_utc # Minimum output file object @@ -63,6 +63,7 @@ def add_files( keep: bool, follow_symlinks: bool, skip_tars_md5: bool = False, + non_blocking: bool = False, ) -> List[str]: # Now, perform the actual archiving @@ -87,7 +88,7 @@ def add_files( tname = "{0:0{1}x}".format(itar, 6) # Create the tar file name by adding ".tar" tfname = "{}.tar".format(tname) - logger.info("Creating new tar archive {}".format(tfname)) + logger.info(f"{ts_utc()}: Creating new tar archive {tfname}") # Open that tar file in the cache do_hash: bool if not skip_tars_md5: @@ -136,12 +137,13 @@ def add_files( if i == nfiles - 1 or tarsize + next_file_size > maxsize: # Close current temporary file - logger.debug("Closing tar archive {}".format(tfname)) + logger.debug(f"{ts_utc()}: Closing tar archive {tfname}") tar.close() tarsize = tarFileObject.tell() tar_md5: Optional[str] = tarFileObject.md5() tarFileObject.close() + logger.info(f"{ts_utc()}: (add_files): Completed archive file {tfname}") if not skip_tars_md5: tar_tuple: TupleTarsRowNoId = (tfname, tarsize, tar_md5) logger.info("tar name={}, tar size={}, tar md5={}".format(*tar_tuple)) @@ -156,7 +158,19 @@ def add_files( hpss: str = config.hpss else: raise TypeError("Invalid config.hpss={}".format(config.hpss)) - hpss_put(hpss, os.path.join(cache, tfname), cache, keep) + + # NOTE: These lines could be added under an "if debug" condition + # logger.info(f"{ts_utc()}: CONTENTS of CACHE upon call to hpss_put:") + # process = subprocess.run(["ls", "-l", "zstash"], capture_output=True, text=True) + # print(process.stdout) + + logger.info( + f"{ts_utc()}: DIVING: (add_files): Calling hpss_put to dispatch archive file {tfname}" + ) + hpss_put(hpss, os.path.join(cache, tfname), cache, keep, non_blocking) + logger.info( + f"{ts_utc()}: SURFACE (add_files): Called hpss_put to dispatch archive file {tfname}" + ) # Update database with files that have been archived # Add a row to the "files" table, diff --git a/zstash/utils.py b/zstash/utils.py index b74c1954..ea793603 100644 --- a/zstash/utils.py +++ b/zstash/utils.py @@ -4,12 +4,17 @@ import shlex import sqlite3 import subprocess +from datetime import datetime, timezone from fnmatch import fnmatch from typing import Any, List, Tuple from .settings import TupleTarsRow, config, logger +def ts_utc(): + return datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S_%f") + + def filter_files(subset: str, files: List[str], include: bool) -> List[str]: # Construct list of files to filter, based on