Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend --hpss to accept a Globus URL #154

Merged
merged 16 commits into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ venv/

# Sphinx documentation
docs/_build/

# macOS Desktop Services Store
.DS_Store
7 changes: 3 additions & 4 deletions docs/source/tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,9 @@ NERSC machines have HPSS access. We can choose to use HPSS by setting
.. note::
Before using zstash with HPSS for the first time, run ``hsi`` on NERSC
and enter your credentials. Then, ``zstash`` will be able to access HPSS.
Compy does not have HPSS access. Therefore, you’ll need to set
``--hpss=none`` when using it. For long term storage, zstash archives
created locally on Compy should be transferred to an off-site HPSS storage using Globus
(:ref:`globus-compy`)
Compy and Anvil do not have HPSS access. Therefore, you’ll need to use a Globus URL
that points to a remote HPSS storage ``--hpss=globus://<Globus endpoint UUID/<path>``
or set ``--hpss=none``.


Examples
Expand Down
26 changes: 25 additions & 1 deletion docs/source/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ where
system where the archive files will be stored. This directory should be **unique** for each
zstash archive. If ``--hpss=none``, then files will be archived locally instead of being
transferred to HPSS. The ``none`` option should be used when running Zstash on a machine
without HPSS.
without HPSS. The option also accepts a Globus URL, ``globus://<Globus endpoint UUID/<path to archive>``.
Then zstash will use `Globus <https://globus.org/>`_ to store a new zstash archive on a Globus endpoint.
Names ``alcf`` and ``nersc`` are recognized as referring to the ALCF HPSS and NERSC HPSS endpoints,
e.g. ``globus://nersc/~/my_archive``.
* ``<local path>`` specifies the path to the local directory that should be archived.

Additional optional arguments:
Expand Down Expand Up @@ -95,6 +98,27 @@ to conserve storage space: ::
This exclude pattern will skip all restart subdirectories under the short-term archive,
except for those with years ending in '0' or '5'.

Example with Globus
-------------------
If you run zstash on the system without the HPSS file system, but has a `Globus <https://app.globus.org/endpoints>`_ endpoint set up,
you can use a Globus URL: ::

$ cd $CSCRATCH/ACME_simulations/20170731.F20TR.ne30_ne30.anvil
$ zstash create --hpss=globus://9cd89cfd-6d04-11e5-ba46-22000b92c6ec/~/test/E3SM_simulations/20170731.F20TR.ne30_ne30.anvil .

9cd89cfd-6d04-11e5-ba46-22000b92c6ec is the NERSC HPSS Globus endpoint UUID. Two names ``nersc`` and ``alcf``
are recognized by zstash and substituted internally with a corresponding Globus UUID
for the NERSC HPSS Globus endpoint (9cd89cfd-6d04-11e5-ba46-22000b92c6ec) and
the ALCF HPSS Globus endpoint (de463ec4-6d04-11e5-ba46-22000b92c6ec) endpoint.
If you want to store zstash archive on these two remote HPSS file systems, you can use the names instead of UUIDs: ::

$ zstash create --hpss=globus://nersc/~/test/E3SM_simulations/20170731.F20TR.ne30_ne30.anvil .

.. note::
If you are a new Globus user, you should first do a small transfer to test functionality.

.. note::
Always activate Globus endpoints via the Globus web interface before running ``zstash``.

Check
=====
Expand Down
5 changes: 5 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,10 @@
author_email="forsyth2@llnl.gov, golaz1@llnl.gov, shaheen2@llnl.gov",
description="Long term HPSS archiving software for E3SM",
packages=find_packages(include=["zstash", "zstash.*"]),
install_requires=[
"six==1.16.0",
"globus-sdk==2.0.1",
"fair-research-login==0.2.0",
],
entry_points={"console_scripts": ["zstash=zstash.main:main"]},
)
184 changes: 184 additions & 0 deletions tests/test_globus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import configparser
import os
import re
import shutil
import socket
import unittest

from fair_research_login.client import NativeClient
from globus_sdk import DeleteData, TransferClient
from globus_sdk.exc import TransferAPIError

from tests.base import TOP_LEVEL, ZSTASH_PATH, TestZstash, print_starred, run_cmd

# Use 'Globus Tutorial Endpoint 1' to simulate an HPSS Globus endpoint
hpss_globus_endpoint = "ddb59aef-6d04-11e5-ba46-22000b92c6ec"

regex_endpoint_map = {
r"theta.*\.alcf\.anl\.gov": "08925f04-569f-11e7-bef8-22000b9a448b",
r"blueslogin.*\.lcrc\.anl\.gov": "61f9954c-a4fa-11ea-8f07-0a21f750d19b",
r"chr.*\.lcrc\.anl\.gov": "61f9954c-a4fa-11ea-8f07-0a21f750d19b",
r"cori.*\.nersc\.gov": "9d6d99eb-6d04-11e5-ba46-22000b92c6ec",
}


class TestGlobus(TestZstash):
def preactivate_globus(self):
"""
Read the local globus endpoint UUID from ~/.zstash.ini.
If the ini file does not exist, create an ini file with empty values,
and try to find the local endpoint UUID based on the FQDN
"""
local_endpoint = None
ini_path = os.path.expanduser("~/.zstash.ini")
ini = configparser.ConfigParser()
if ini.read(ini_path):
if "local" in ini.sections():
local_endpoint = ini["local"].get("globus_endpoint_uuid")
else:
ini["local"] = {"globus_endpoint_uuid": ""}
try:
with open(ini_path, "w") as f:
ini.write(f)
except Exception as e:
self.fail(e)
if not local_endpoint:
fqdn = socket.getfqdn()
for pattern in regex_endpoint_map.keys():
if re.fullmatch(pattern, fqdn):
local_endpoint = regex_endpoint_map.get(pattern)
break
if not local_endpoint:
self.fail("{} does not have the local Globus endpoint set".format(ini_path))

native_client = NativeClient(
client_id="6c1629cf-446c-49e7-af95-323c6412397f",
app_name="Zstash",
default_scopes="openid urn:globus:auth:scope:transfer.api.globus.org:all",
)
native_client.login(no_local_server=True, refresh_tokens=True)
transfer_authorizer = native_client.get_authorizers().get(
"transfer.api.globus.org"
)
self.transfer_client = TransferClient(transfer_authorizer)

for ep_id in [hpss_globus_endpoint, local_endpoint]:
r = self.transfer_client.endpoint_autoactivate(ep_id, if_expires_in=600)
if r.get("code") == "AutoActivationFailed":
self.fail(
"The {} endpoint is not activated or the current activation expires soon. Please go to https://app.globus.org/file-manager/collections/{} and (re)-activate the endpoint.".format(
ep_id, ep_id
)
)

def delete_files_globus(self):
ep_id = hpss_globus_endpoint
r = self.transfer_client.endpoint_autoactivate(ep_id, if_expires_in=60)
if r.get("code") == "AutoActivationFailed":
self.fail(
"The {} endpoint is not activated. Please go to https://app.globus.org/file-manager/collections/{} and activate the endpoint.".format(
ep_id, ep_id
)
)

ddata = DeleteData(self.transfer_client, hpss_globus_endpoint, recursive=True)
ddata.add_item("/~/zstash_test/")
try:
task = self.transfer_client.submit_delete(ddata)
task_id = task.get("task_id")
"""
A Globus transfer job (task) can be in one of the three states:
ACTIVE, SUCCEEDED, FAILED. The script every 5 seconds polls a
status of the transfer job (task) from the Globus Transfer service,
with 5 second timeout limit. If the task is ACTIVE after time runs
out 'task_wait' returns False, and True otherwise.
"""
while not self.transfer_client.task_wait(task_id, 5, 5):
task = self.transfer_client.get_task(task_id)
if task.get("is_paused"):
break
"""
The Globus transfer job (task) has been finished (SUCCEEDED or FAILED),
or is still active (ACTIVE). Check if the transfer SUCCEEDED or FAILED.
"""
task = self.transfer_client.get_task(task_id)
if task["status"] == "SUCCEEDED":
pass
elif task.get("status") == "ACTIVE":
if task.get("is_paused"):
pause_info = self.transfer_client.task_pause_info(task_id)
paused_rules = pause_info.get("pause_rules")
reason = paused_rules[0].get("message")
message = "The task was paused. Reason: {}".format(reason)
print(message)
else:
message = "The task reached a {} second deadline\n".format(
24 * 3600
)
print(message)
self.transfer_client.cancel_task(task_id)
else:
print("Globus delete FAILED")
except TransferAPIError as e:
if e.code == "NoCredException":
self.fail(
"{}. Please go to https://app.globus.org/endpoints and activate the endpoint.".format(
e.message
)
)
else:
self.fail(e)
except Exception as e:
self.fail("{} - exception: {}".format(self, e))

def tearDown(self):
"""
Tear down a test. This is run after every test method.

After the script has failed or completed, remove all created files, even those on the HPSS repo.
"""
os.chdir(TOP_LEVEL)
print("Removing test files, both locally and at the HPSS repo")
# self.cache may appear in any of these directories,
# but should not appear at the same level as these.
# Therefore, there is no need to explicitly remove it.
for d in [self.test_dir, self.backup_dir]:
if os.path.exists(d):
shutil.rmtree(d)

if self.hpss_path and self.hpss_path.lower().startswith("globus:"):
self.delete_files_globus()

def helperLsGlobus(self, test_name, hpss_path, cache=None, zstash_path=ZSTASH_PATH):
"""
Test `zstash ls --hpss=globus://...`.
"""
self.preactivate_globus()
self.hpss_path = hpss_path
if cache:
# Override default cache
self.cache = cache
cache_option = " --cache={}".format(self.cache)
else:
cache_option = ""
use_hpss = self.setupDirs(test_name)
self.create(use_hpss, zstash_path, cache=self.cache)
self.assertWorkspace()
os.chdir(self.test_dir)
for option in ["", "-v", "-l"]:
print_starred("Testing zstash ls {}".format(option))
cmd = "{}zstash ls{} {} --hpss={}".format(
zstash_path, cache_option, option, self.hpss_path
)
output, err = run_cmd(cmd)
self.check_strings(cmd, output + err, ["file0.txt"], ["ERROR"])
os.chdir(TOP_LEVEL)

def testLs(self):
self.helperLsGlobus(
"testLsGlobus", f"globus://{hpss_globus_endpoint}/~/zstash_test/"
)


if __name__ == "__main__":
unittest.main()
55 changes: 39 additions & 16 deletions zstash/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import sys
from typing import Any, List, Tuple

from six.moves.urllib.parse import urlparse

from .hpss import hpss_put
from .hpss_utils import add_files
from .settings import DEFAULT_CACHE, config, get_db_filename, logger
Expand Down Expand Up @@ -50,19 +52,21 @@ def create():
raise NotADirectoryError(input_path_error_str)

if hpss != "none":
# config.hpss is not "none", so we need to
# create target HPSS directory
logger.debug("Creating target HPSS directory")
mkdir_command: str = "hsi -q mkdir -p {}".format(hpss)
mkdir_error_str: str = "Could not create HPSS directory: {}".format(hpss)
run_command(mkdir_command, mkdir_error_str)

# Make sure it is exists and is empty
logger.debug("Making sure target HPSS directory exists and is empty")

ls_command: str = 'hsi -q "cd {}; ls -l"'.format(hpss)
ls_error_str: str = "Target HPSS directory is not empty"
run_command(ls_command, ls_error_str)
url = urlparse(hpss)
if url.scheme != "globus":
# config.hpss is not "none", so we need to
# create target HPSS directory
logger.debug("Creating target HPSS directory")
mkdir_command: str = "hsi -q mkdir -p {}".format(hpss)
mkdir_error_str: str = "Could not create HPSS directory: {}".format(hpss)
run_command(mkdir_command, mkdir_error_str)

# Make sure it is exists and is empty
logger.debug("Making sure target HPSS directory exists and is empty")

ls_command: str = 'hsi -q "cd {}; ls -l"'.format(hpss)
ls_error_str: str = "Target HPSS directory is not empty"
run_command(ls_command, ls_error_str)

# Create cache directory
logger.debug("Creating local cache directory")
Expand All @@ -81,7 +85,9 @@ def create():
failures: List[str] = create_database(cache, args)

# Transfer to HPSS. Always keep a local copy.
hpss_put(hpss, get_db_filename(cache), cache, keep=True)
hpss_put(
hpss, get_db_filename(cache), cache, keep=True, non_blocking=args.non_blocking
)

if len(failures) > 0:
# List the failures
Expand All @@ -102,7 +108,11 @@ def setup_create() -> Tuple[str, argparse.Namespace]:
required.add_argument(
"--hpss",
type=str,
help='path to storage on HPSS. Set to "none" for local archiving. Must be set to "none" if the machine does not have HPSS access.',
help=(
'path to storage on HPSS. Set to "none" for local archiving. It also can be a Globus URL, '
'globus://<GLOBUS_ENDPOINT_UUID>/<PATH>. Names "alcf" and "nersc" are recognized as referring to the ALCF HPSS '
"and NERSC HPSS endpoints, e.g. globus://nersc/~/my_archive."
),
required=True,
)
optional: argparse._ArgumentGroup = parser.add_argument_group(
Expand All @@ -127,6 +137,11 @@ def setup_create() -> Tuple[str, argparse.Namespace]:
type=str,
help='the path to the zstash archive on the local file system. The default name is "zstash".',
)
optional.add_argument(
"--non-blocking",
action="store_true",
help="do not wait for each Globus transfer until it completes.",
)
optional.add_argument(
"-v", "--verbose", action="store_true", help="increase output verbosity"
)
Expand All @@ -140,6 +155,8 @@ def setup_create() -> Tuple[str, argparse.Namespace]:
args: argparse.Namespace = parser.parse_args(sys.argv[2:])
if args.hpss and args.hpss.lower() == "none":
args.hpss = "none"
if args.non_blocking:
args.keep = True
if args.verbose:
logger.setLevel(logging.DEBUG)

Expand Down Expand Up @@ -217,7 +234,13 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]:

# Add files to archive
failures: List[str] = add_files(
cur, con, -1, files, cache, skip_tars_md5=args.no_tars_md5
cur,
con,
-1,
files,
cache,
skip_tars_md5=args.no_tars_md5,
non_blocking=args.non_blocking,
)

# Close database
Expand Down
6 changes: 5 additions & 1 deletion zstash/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ def setup_extract() -> Tuple[argparse.Namespace, str]:
optional.add_argument(
"--hpss",
type=str,
help='path to storage on HPSS. Set to "none" for local archiving. Must be set to "none" if the machine does not have HPSS access.',
help=(
'path to storage on HPSS. Set to "none" for local archiving. It also can be a Globus URL, '
'globus://<GLOBUS_ENDPOINT_UUID>/<PATH>. Names "alcf" and "nersc" are recognized as referring to the ALCF HPSS '
"and NERSC HPSS endpoints, e.g. globus://nersc/~/my_archive."
),
)
optional.add_argument(
"--workers", type=int, default=1, help="num of multiprocess workers"
Expand Down
Loading