Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More improvements on TACA running Anglerfish #410

Merged
merged 8 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions VERSIONLOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# TACA Version Log

## 20240202.1

Use abspath for Anglerfish stderr path, make it possible to instantiate ONT run w/o specifying the type, add more info to the ONT db update subcommand.

## 20240201.1

Fix bugs that changs in PR #404 were reverted in PR #411
Expand Down
50 changes: 33 additions & 17 deletions taca/nanopore/ONT_run_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,6 @@ def __init__(self, run_abspath: str):
if v == "None":
self.rsync_options[k] = None

# Get transfer details, depending on run type and instrument
self.transfer_details = CONFIG["nanopore_analysis"]["run_types"][self.run_type][
"instruments"
][self.instrument]

# Get DB
self.db = NanoporeRunsConnection(CONFIG["statusdb"], dbname="nanopore_runs")

Expand Down Expand Up @@ -123,13 +118,6 @@ def assert_contents(self):
assert self.has_file("/final_summary*.txt")
assert self.has_file("/pore_activity*.csv")

def is_transferred(self) -> bool:
"""Return True if run ID in transfer.tsv, else False."""
with open(self.transfer_details["transfer_log"]) as f:
return self.run_name in f.read()

# DB update

def touch_db_entry(self):
"""Check run vs statusdb. Create entry if there is none."""

Expand Down Expand Up @@ -172,6 +160,18 @@ def update_db_entry(self, force_update=False):
# Instantiate json (dict) to update the db with
db_update = {}

# Parse run path
db_update["run_path"] = (
open(f"{self.run_abspath}/run_path.txt").read().strip()
)

# Parse pore counts
pore_counts = []
with open(f"{self.run_abspath}/pore_count_history.csv") as stream:
for line in csv.DictReader(stream):
pore_counts.append(line)
db_update["pore_count_history"] = pore_counts

# Parse report_*.json
self.parse_minknow_json(db_update)

Expand Down Expand Up @@ -230,7 +230,7 @@ def parse_pore_activity(self, db_update):
def parse_minknow_json(self, db_update):
"""Parse useful stuff from the MinKNOW .json report to add to CouchDB"""

logger.info(f"{self.run_name}:Parsing report JSON...")
logger.info(f"{self.run_name}: Parsing report JSON...")

dict_json_report = json.load(open(self.get_file("/report*.json")))

Expand Down Expand Up @@ -377,16 +377,27 @@ class ONT_user_run(ONT_run):
"""ONT user run, has class methods and attributes specific to user runs."""

def __init__(self, run_abspath: str):
self.run_type = "user_run"
super().__init__(run_abspath)
self.run_type = "user_run"
self.transfer_details = CONFIG["nanopore_analysis"]["run_types"][self.run_type][
"instruments"
][self.instrument]

def is_transferred(self) -> bool:
"""Return True if run ID in transfer.tsv, else False."""
with open(self.transfer_details["transfer_log"]) as f:
return self.run_name in f.read()


class ONT_qc_run(ONT_run):
"""ONT QC run, has class methods and attributes specific to QC runs"""

def __init__(self, run_abspath: str):
self.run_type = "qc_run"
super().__init__(run_abspath)
self.run_type = "qc_run"
self.transfer_details = CONFIG["nanopore_analysis"]["run_types"][self.run_type][
"instruments"
][self.instrument]

# Get Anglerfish attributes from run
self.anglerfish_done_abspath = f"{self.run_abspath}/.anglerfish_done"
Expand All @@ -402,6 +413,11 @@ def __init__(self, run_abspath: str):
]
self.anglerfish_path = self.anglerfish_config["anglerfish_path"]

def is_transferred(self) -> bool:
"""Return True if run ID in transfer.tsv, else False."""
with open(self.transfer_details["transfer_log"]) as f:
return self.run_name in f.read()

# QC methods

def get_anglerfish_exit_code(self) -> Union[int, None]:
Expand Down Expand Up @@ -510,7 +526,7 @@ def run_anglerfish(self):
# Copy samplesheet used for traceability
shutil.copy(self.anglerfish_samplesheet, f"{taca_anglerfish_run_dir}/")
# Create files to dump subprocess std
stderr_relpath = f"{taca_anglerfish_run_dir}/stderr.txt"
stderr_abspath = f"{self.run_abspath}/{taca_anglerfish_run_dir}/stderr.txt"

full_command = [
# Dump subprocess PID into 'run-ongoing'-indicator file.
Expand All @@ -535,7 +551,7 @@ def run_anglerfish(self):
stream.write("\n".join(full_command))

# Start Anglerfish subprocess
with open(stderr_relpath, "w") as stderr:
with open(stderr_abspath, "w") as stderr:
process = subprocess.Popen(
f"bash {taca_anglerfish_run_dir}/command.sh",
shell=True,
Expand Down
Loading