diff --git a/jcvi/formats/maf.py b/jcvi/formats/maf.py index 3084a77c..01ed1ec5 100644 --- a/jcvi/formats/maf.py +++ b/jcvi/formats/maf.py @@ -7,13 +7,18 @@ """ import sys +from bisect import bisect +from dataclasses import dataclass + +from Bio import AlignIO +from Bio import SeqIO from bx import interval_index_file from bx.align import maf from ..apps.base import ActionDispatcher, OptionParser, need_update from ..apps.lastz import blastz_score_to_ncbi_expectation, blastz_score_to_ncbi_bits -from .base import BaseFile +from .base import BaseFile, logger class Maf(BaseFile, dict): @@ -58,16 +63,100 @@ def build_index(self, filename, indexfile): index_handle.close() +@dataclass +class Breakpoint: + arec: str + astart: int + brec: str + bstart: int + + def __str__(self): + return f"{self.arec}:{self.astart}-{self.brec}:{self.bstart}" + + def main(): actions = ( ("bed", "convert MAF to BED format"), ("blast", "convert MAF to BLAST tabular format"), + ("breakpoints", "find breakpoints in MAF and 'simulate' chimeric contigs"), ) p = ActionDispatcher(actions) p.dispatch(globals()) +def breakpoints(args): + """ + %prog breakpoints A.B.maf A.fa B.fa AB 1000000 2000000 + + Find breakpoints in MAF and 'simulate' chimeric contigs in `AB.fa`. + Breakpoints are 'roughly' around the user defined positions. The idea is + to simulate chimeric contigs, which are useful for testing algorithms, + e.g. klassify. + """ + p = OptionParser(breakpoints.__doc__) + p.add_argument( + "--minsize", + default=10000, + type=int, + help="Minimum size of alignment to consider", + ) + opts, args = p.parse_args(args) + + if len(args) not in (5, 6): + sys.exit(not p.print_help()) + + maf_file, a_fasta, b_fasta, ab = args[:4] + bps = sorted(int(x) for x in args[4:]) + minsize = opts.minsize + + filtered_msa = [] + for msa in AlignIO.parse(maf_file, "maf"): + arec, brec = msa + if brec.annotations["size"] < minsize: + continue + filtered_msa.append((brec.annotations["start"], arec, brec)) + logger.info("Total alignments: %d", len(filtered_msa)) + + final = [] + for bp in bps: + i = bisect(filtered_msa, (bp,)) + _, arec, brec = filtered_msa[i] + logger.info("Breakpoint at %d") + logger.info("%s", arec) + logger.info("%s", brec) + assert len(arec) == len(brec) + # Find the midpoint, safe to crossover there + midpoint = len(arec) // 2 + aseq = arec.seq[:midpoint] + astart = arec.annotations["start"] + len(aseq) - aseq.count("-") + bseq = brec.seq[:midpoint] + bstart = brec.annotations["start"] + len(bseq) - bseq.count("-") + bpt = Breakpoint(arec.id, astart, brec.id, bstart) + final.append(bpt) + + logger.info("Breakpoints found: %s", final) + # Load the sequences + ar = next(SeqIO.parse(a_fasta, "fasta")) + br = next(SeqIO.parse(b_fasta, "fasta")) + if len(final) == 2: + bp1, bp2 = final[:2] + # ====-------======= + # bp1 bp2 + abseq = ( + ar.seq[: bp1.astart] + + br.seq[bp1.bstart : bp2.bstart] + + ar.seq[bp2.astart :] + ) + elif len(final) == 1: + bp = final[0] + abseq = ar.seq[: bp.astart] + br.seq[bp.bstart :] + abrec = SeqIO.SeqRecord(abseq, id=ab, description="") + ab_fasta = f"{ab}.fa" + SeqIO.write([abrec], ab_fasta, "fasta") + logger.info("Writing to %s", ab_fasta) + + def bed(args): """ %prog bed maffiles > out.bed @@ -76,8 +165,7 @@ def bed(args): then useful to check coverage, etc. """ p = OptionParser(bed.__doc__) - - opts, args = p.parse_args(args) + _, args = p.parse_args(args) if len(args) != 1: sys.exit(p.print_help()) @@ -129,6 +217,9 @@ def alignment_details(a, b): def maf_to_blast8(f): + """ + Convert a MAF file to BLAST tabular format. + """ reader = Maf(f).reader for rec in reader: a, b = rec.components @@ -174,7 +265,7 @@ def blast(args): From a folder of .maf files, generate .blast file with tabular format. """ p = OptionParser(blast.__doc__) - opts, args = p.parse_args(args) + _, args = p.parse_args(args) if len(args) == 0: sys.exit(p.print_help())