Skip to content

Commit

Permalink
Switch to tarball and skip compression
Browse files Browse the repository at this point in the history
  • Loading branch information
chuan-wang committed Oct 16, 2024
1 parent 04ee3f3 commit 4a18d45
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 44 deletions.
4 changes: 4 additions & 0 deletions VERSIONLOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# TACA Version Log

## 20241016.1

Switch to tarball and skip compression

## 20241007.1

Update taca-ngi-pipeline repo URL in README
Expand Down
4 changes: 2 additions & 2 deletions taca/analysis/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def transfer_runfolder(run_dir, pid, exclude_lane):

# Create a tar archive of the runfolder
dir_name = os.path.basename(run_dir)
archive = run_dir + "_" + "_".join(pid_list) + ".tar.gz"
archive = run_dir + "_" + "_".join(pid_list) + ".tar"
run_dir_path = os.path.dirname(run_dir)

# Prepare the options for excluding lanes
Expand Down Expand Up @@ -263,7 +263,7 @@ def transfer_runfolder(run_dir, pid, exclude_lane):
subprocess.call(
["tar"]
+ exclude_options_for_tar
+ ["-cvzf", archive, "-C", run_dir_path, dir_name]
+ ["-cvf", archive, "-C", run_dir_path, dir_name]
)
except subprocess.CalledProcessError as e:
logger.error("Error creating tar archive")
Expand Down
81 changes: 40 additions & 41 deletions taca/backup/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ def __init__(self, run, archive_path):
self.abs_path = os.path.abspath(run)
self.path, self.name = os.path.split(self.abs_path)
self.name = self.name.split(".", 1)[0]
self.zip = os.path.join(archive_path, f"{self.name}.tar.gz")
self.tar = os.path.join(archive_path, f"{self.name}.tar")
self.key = f"{self.name}.key"
self.key_encrypted = f"{self.name}.key.gpg"
self.zip_encrypted = os.path.join(archive_path, f"{self.name}.tar.gz.gpg")
self.tar_encrypted = os.path.join(archive_path, f"{self.name}.tar.gpg")


class backup_utils:
Expand Down Expand Up @@ -286,25 +286,25 @@ def _is_ready_to_archive(self, run, ext):
if (
os.path.exists(rta_file)
and os.path.exists(cp_file)
and (not self.file_in_pdc(run.zip_encrypted))
and (not self.file_in_pdc(run.tar_encrypted))
) or (
self._get_run_type(run.name) in ["promethion", "minion"]
and os.path.exists(os.path.join(run_path, ".sync_finished"))
):
# Case for encrypting
# Run has NOT been encrypted (run.tar.gz.gpg not exists)
if ext == ".tar.gz" and (not os.path.exists(run.zip_encrypted)):
# Run has NOT been encrypted (run.tar.gpg not exists)
if ext == ".tar" and (not os.path.exists(run.tar_encrypted)):
logger.info(
f"Sequencing has finished and copying completed for run {os.path.basename(run_path)} and is ready for archiving"
)
archive_ready = True
# Case for putting data to PDC
# Run has already been encrypted (run.tar.gz.gpg exists)
elif ext == ".tar.gz.gpg" and os.path.exists(run.zip_encrypted):
logger.info(
f"Sequencing has finished and copying completed for run {os.path.basename(run_path)} and is ready for sending to PDC"
)
archive_ready = True
# Run has already been encrypted (run.tar.gpg exists)
elif ext == ".tar.gpg" and os.path.exists(run.tar_encrypted):
logger.info(
f"Sequencing has finished and copying completed for run {os.path.basename(run_path)} and is ready for sending to PDC"
)
archive_ready = True

return archive_ready

Expand All @@ -328,12 +328,12 @@ def _move_run_to_archived(self, run):
def encrypt_runs(cls, run, force):
"""Encrypt the runs that have been collected."""
bk = cls(run)
bk.collect_runs(ext=".tar.gz")
bk.collect_runs(ext=".tar")
logger.info(f"In total, found {len(bk.runs)} run(s) to be encrypted")
for run in bk.runs:
run.flag = f"{run.name}.encrypting"
run.dst_key_encrypted = os.path.join(bk.keys_path, run.key_encrypted)
tmp_files = [run.zip_encrypted, run.key_encrypted, run.key, run.flag]
tmp_files = [run.tar_encrypted, run.key_encrypted, run.key, run.flag]
logger.info(f"Encryption of run {run.name} is now started")
# Check if there is enough space and exit if not
bk.avail_disk_space(run.path, run.name)
Expand All @@ -357,44 +357,43 @@ def encrypt_runs(cls, run, force):
)
continue
open(run.flag, "w").close()
# zip the run directory
if os.path.exists(run.zip):
# Make run directory tarball
if os.path.exists(run.tar):
if os.path.isdir(run.name):
logger.warn(
f"Both run source and zipped archive exist for run {run.name}, skipping run as precaution"
f"Both run source and archive tarball exist for run {run.name}, skipping run as precaution"
)
bk._clean_tmp_files([run.flag])
continue
logger.info(
f"Zipped archive already exist for run {run.name}, so using it for encryption"
f"Archive tarball already exist for run {run.name}, so using it for encryption"
)
else:
exclude_files = " ".join(
[f"--exclude {x}" for x in bk.exclude_list]
)
logger.info(f"Creating zipped archive for run {run.name}")
logger.info(f"Creating archive tarball for run {run.name}")
if bk._call_commands(
cmd1=f"tar {exclude_files} -cf - {run.name}",
cmd2="pigz --fast -c -",
out_file=run.zip,
out_file=run.tar,
mail_failed=True,
tmp_files=[run.zip, run.flag],
tmp_files=[run.tar, run.flag],
):
logger.info(
f"Run {run.name} was successfully compressed and transferred to {run.zip}"
f"Run {run.name} was successfully tarballed and transferred to {run.tar}"
)
else:
logger.warn(f"Skipping run {run.name} and moving on")
continue
# Remove encrypted file if already exists
if os.path.exists(run.zip_encrypted):
if os.path.exists(run.tar_encrypted):
logger.warn(
f"Removing already existing encrypted file for run {run.name}, this is a precaution "
"to make sure the file was encrypted with correct key file"
)
bk._clean_tmp_files(
[
run.zip_encrypted,
run.tar_encrypted,
run.key,
run.key_encrypted,
run.dst_key_encrypted,
Expand All @@ -411,18 +410,18 @@ def encrypt_runs(cls, run, force):
if not force:
logger.info("Calculating md5sum before encryption")
md5_call, md5_out = bk._call_commands(
cmd1=f"md5sum {run.zip}", return_out=True, tmp_files=tmp_files
cmd1=f"md5sum {run.tar}", return_out=True, tmp_files=tmp_files
)
if not md5_call:
logger.warn(f"Skipping run {run.name} and moving on")
continue
md5_pre_encrypt = md5_out.split()[0]
# Encrypt the zipped run file
logger.info("Encrypting the zipped run file")
# Encrypt the tar run file
logger.info("Encrypting the tar run file")
if not bk._call_commands(
cmd1=(
f"gpg --symmetric --cipher-algo aes256 --passphrase-file {run.key} --batch --compress-algo "
f"none -o {run.zip_encrypted} {run.zip}"
f"none -o {run.tar_encrypted} {run.tar}"
),
tmp_files=tmp_files,
):
Expand All @@ -432,7 +431,7 @@ def encrypt_runs(cls, run, force):
if not force:
logger.info("Calculating md5sum after encryption")
md5_call, md5_out = bk._call_commands(
cmd1=f"gpg --decrypt --cipher-algo aes256 --passphrase-file {run.key} --batch {run.zip_encrypted}",
cmd1=f"gpg --decrypt --cipher-algo aes256 --passphrase-file {run.key} --batch {run.tar_encrypted}",
cmd2="md5sum",
return_out=True,
tmp_files=tmp_files,
Expand All @@ -457,16 +456,16 @@ def encrypt_runs(cls, run, force):
else:
logger.error("Encryption of key file failed, skipping run")
continue
bk._clean_tmp_files([run.zip, run.key, run.flag])
bk._clean_tmp_files([run.tar, run.key, run.flag])
logger.info(
f"Encryption of run {run.name} is successfully done, removing zipped run file"
f"Encryption of run {run.name} is successfully done, removing run folder tarball"
)

@classmethod
def pdc_put(cls, run):
"""Archive the collected runs to PDC."""
bk = cls(run)
bk.collect_runs(ext=".tar.gz.gpg", filter_by_ext=True)
bk.collect_runs(ext=".tar.gpg", filter_by_ext=True)
logger.info(f"In total, found {len(bk.runs)} run(s) to send PDC")
for run in bk.runs:
run.flag = f"{run.name}.archiving"
Expand All @@ -481,7 +480,7 @@ def pdc_put(cls, run):
continue
if not os.path.exists(run.dst_key_encrypted):
logger.error(
f"Encrypted key file {run.dst_key_encrypted} is not found for file {run.zip_encrypted}, skipping it"
f"Encrypted key file {run.dst_key_encrypted} is not found for file {run.tar_encrypted}, skipping it"
)
continue
with filesystem.chdir(run.path):
Expand All @@ -497,17 +496,17 @@ def pdc_put(cls, run):
f"Run {run.name} is already being archived, so skipping now"
)
continue
if bk.file_in_pdc(run.zip_encrypted, silent=False) or bk.file_in_pdc(
if bk.file_in_pdc(run.tar_encrypted, silent=False) or bk.file_in_pdc(
run.dst_key_encrypted, silent=False
):
logger.warn(
f"Seems like files related to run {run.name} already exist in PDC, check and cleanup"
)
continue
open(run.flag, "w").close()
logger.info(f"Sending file {run.zip_encrypted} to PDC")
logger.info(f"Sending file {run.tar_encrypted} to PDC")
if bk._call_commands(
cmd1=f"dsmc archive {run.zip_encrypted}", tmp_files=[run.flag]
cmd1=f"dsmc archive {run.tar_encrypted}", tmp_files=[run.flag]
):
time.sleep(15) # give some time just in case 'dsmc' needs to settle
if bk._call_commands(
Expand All @@ -517,18 +516,18 @@ def pdc_put(cls, run):
time.sleep(
5
) # give some time just in case 'dsmc' needs to settle
if bk.file_in_pdc(run.zip_encrypted) and bk.file_in_pdc(
if bk.file_in_pdc(run.tar_encrypted) and bk.file_in_pdc(
run.dst_key_encrypted
):
logger.info(
f"Successfully sent file {run.zip_encrypted} to PDC, moving file locally from {run.path} to archived folder"
f"Successfully sent file {run.tar_encrypted} to PDC, moving file locally from {run.path} to archived folder"
)
bk.log_archived_run(run.zip_encrypted)
bk.log_archived_run(run.tar_encrypted)
if bk.couch_info:
bk._log_pdc_statusdb(run.name)
bk._clean_tmp_files(
[run.zip_encrypted, run.dst_key_encrypted, run.flag]
[run.tar_encrypted, run.dst_key_encrypted, run.flag]
)
bk._move_run_to_archived(run)
continue
logger.warn(f"Sending file {run.zip_encrypted} to PDC failed")
logger.warn(f"Sending file {run.tar_encrypted} to PDC failed")
2 changes: 1 addition & 1 deletion taca/backup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def backup(ctx):
"-r",
"--run",
type=click.Path(exists=True),
help="A run (directory or a zipped archive) to be encrypted",
help="A run (directory or a tar archive) to be encrypted",
)
@click.option(
"-f",
Expand Down

0 comments on commit 4a18d45

Please sign in to comment.