Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More improvements on TACA running Anglerfish #410

Merged
merged 8 commits into from
Feb 5, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions taca/nanopore/ONT_run_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ def __init__(self, run_abspath: str):
self.rsync_options[k] = None

# Get transfer details, depending on run type and instrument
self.transfer_details = CONFIG["nanopore_analysis"]["run_types"][self.run_type][
"instruments"
][self.instrument]
if hasattr(self, "run_type"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would change this to set self.run_type = None # Will be set in subclasses and if self.run_type is not None. I think it's easier to understand. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or perhaps have run_type as a required argument to the base class as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.transfer_details = CONFIG["nanopore_analysis"]["run_types"][self.run_type][
"instruments"
][self.instrument]

# Get DB
self.db = NanoporeRunsConnection(CONFIG["statusdb"], dbname="nanopore_runs")
Expand Down Expand Up @@ -159,7 +160,7 @@ def update_db_entry(self, force_update=False):
self.touch_db_entry()

# If the run document is marked as "ongoing" or database is being manually updated
if self.db.check_run_status(self) == "ongoing" or force_update == True:
if self.db.check_run_status(self) == "ongoing" or force_update is True:
logger.info(
f"{self.run_name}: Run exists in the database with run status: {self.db.check_run_status(self)}."
)
Expand All @@ -169,6 +170,16 @@ def update_db_entry(self, force_update=False):
# Instantiate json (dict) to update the db with
db_update = {}

# Parse run path
db_update["run_path"] = open(f"{self.run_abspath}/run_path.txt", "r").read().strip()

# Parse pore counts
pore_counts = []
with open(f"{self.run_abspath}/pore_count_history.csv", "r") as stream:
for line in csv.DictReader(stream):
pore_counts.append(line)
db_update["pore_count_history"] = pore_counts

# Parse report_*.json
self.parse_minknow_json(db_update)

Expand Down Expand Up @@ -228,7 +239,7 @@ def parse_pore_activity(self, db_update):
def parse_minknow_json(self, db_update):
"""Parse useful stuff from the MinKNOW .json report to add to CouchDB"""

logger.info(f"{self.run_name}:Parsing report JSON...")
logger.info(f"{self.run_name}: Parsing report JSON...")

dict_json_report = json.load(open(self.get_file("/report*.json"), "r"))

Expand Down Expand Up @@ -505,7 +516,7 @@ def run_anglerfish(self):
# Copy samplesheet used for traceability
shutil.copy(self.anglerfish_samplesheet, f"{taca_anglerfish_run_dir}/")
# Create files to dump subprocess std
stderr_relpath = f"{taca_anglerfish_run_dir}/stderr.txt"
stderr_abspath = f"{self.run_abspath}/{taca_anglerfish_run_dir}/stderr.txt"

full_command = [
# Dump subprocess PID into 'run-ongoing'-indicator file.
Expand All @@ -529,7 +540,7 @@ def run_anglerfish(self):
stream.write("\n".join(full_command))

# Start Anglerfish subprocess
with open(stderr_relpath, 'w') as stderr:
with open(stderr_abspath, 'w') as stderr:
process = subprocess.Popen(
f"bash {taca_anglerfish_run_dir}/command.sh",
shell=True,
Expand Down