Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup in org.bdgenomics.adam.converters package. #1062

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
*/
package org.bdgenomics.adam.cli

import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.converters.VariantAnnotationConverter
import org.bdgenomics.adam.models.VariantContext
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rich.RichVariant
import org.bdgenomics.formats.avro._
Expand Down Expand Up @@ -59,7 +58,7 @@ class VcfAnnotation2ADAM(val args: VcfAnnotation2ADAMArgs) extends BDGSparkComma
val existingAnnotations: RDD[DatabaseVariantAnnotation] = sc.loadVariantAnnotations(args.currentAnnotations)
val keyedAnnotations = existingAnnotations.keyBy(anno => new RichVariant(anno.getVariant))
val joinedAnnotations = keyedAnnotations.join(annotations.keyBy(anno => new RichVariant(anno.getVariant)))
val mergedAnnotations = joinedAnnotations.map(kv => VariantAnnotationConverter.mergeAnnotations(kv._2._1, kv._2._2))
val mergedAnnotations = joinedAnnotations.map(kv => VariantContext.mergeAnnotations(kv._2._1, kv._2._2))
mergedAnnotations.saveAsParquet(args)
} else {
annotations.saveAsParquet(args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ import org.bdgenomics.adam.rich.RichAlignmentRecord
import org.bdgenomics.formats.avro.{ AlignmentRecord, Fragment }
import scala.collection.JavaConversions._

class AlignmentRecordConverter extends Serializable {
/**
* This class contains methods to convert AlignmentRecords to other formats.
*/
private[adam] class AlignmentRecordConverter extends Serializable {

/**
* Converts a single record to FASTQ. FASTQ format is:
Expand All @@ -36,7 +39,16 @@ class AlignmentRecordConverter extends Serializable {
* ASCII quality scores
* }}}
*
* If the base qualities are unknown (qual is null or equals "*"), the quality
* scores will be a repeated string of 'B's that is equal to the read length.
*
* @param adamRecord Read to convert to FASTQ.
* @param maybeAddSuffix If true, check if a "/%d" suffix is attached to the
* read. If there is no suffix, a slash and the number of the read in the
* sequenced fragment is appended to the readname. Default is false.
* @param outputOriginalBaseQualities If true and the original base quality
* field is set (SAM "OQ" tag), outputs the original qualities. Else,
* output the qual field. Defaults to false.
* @return Returns this read in string form.
*/
def convertToFastq(
Expand Down Expand Up @@ -90,8 +102,10 @@ class AlignmentRecordConverter extends Serializable {
* Converts a single ADAM record into a SAM record.
*
* @param adamRecord ADAM formatted alignment record to convert.
* @param header SAM file header to use.
* @return Returns the record converted to SAMtools format. Can be used for output to SAM/BAM.
* @param header SAM file header to attach to the record.
* @param rgd Dictionary describing the read groups that are in the RDD that
* this read is from.
* @return Returns the record converted to htsjdk format. Can be used for output to SAM/BAM.
*/
def convert(adamRecord: AlignmentRecord,
header: SAMFileHeaderWritable,
Expand Down Expand Up @@ -202,7 +216,8 @@ class AlignmentRecordConverter extends Serializable {
* @param rgd Dictionary containing record groups.
* @return Converted SAM formatted record.
*/
def createSAMHeader(sd: SequenceDictionary, rgd: RecordGroupDictionary): SAMFileHeader = {
def createSAMHeader(sd: SequenceDictionary,
rgd: RecordGroupDictionary): SAMFileHeader = {
val samSequenceDictionary = sd.toSAMSequenceDictionary
val samHeader = new SAMFileHeader
samHeader.setSequenceDictionary(samSequenceDictionary)
Expand All @@ -224,13 +239,29 @@ class AlignmentRecordConverter extends Serializable {
}
}

object AlignmentRecordConverter {
/**
* Singleton object to assist with converting AlignmentRecords.
*
* Singleton object exists due to cross reference from
* org.bdgenomics.adam.rdd.read.AlignmentRecordRDDFunctions.
*/
private[adam] object AlignmentRecordConverter extends Serializable {

/**
* Checks to see if a read name has a index suffix.
*
* Read names frequently end in a "/%d" suffix, where the digit at the end
* signifies the number of this read in the sequenced fragment. E.g., for an
* Illumina paired-end protocol, the first read in the pair will have a "/1"
* suffix, while the second read in the pair will have a "/2" suffix.
*
* @param adamRecord Record to check.
* @return True if the read ends in a read number suffix.
*/
def readNameHasPairedSuffix(adamRecord: AlignmentRecord): Boolean = {
adamRecord.getReadName.length() > 2 &&
adamRecord.getReadName.charAt(adamRecord.getReadName.length() - 2) == '/' &&
(adamRecord.getReadName.charAt(adamRecord.getReadName.length() - 1) == '1' ||
adamRecord.getReadName.charAt(adamRecord.getReadName.length() - 1) == '2')
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,56 @@
*/
package org.bdgenomics.adam.converters

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.bdgenomics.formats.avro.{ Contig, NucleotideContigFragment }
import scala.Int
import scala.Predef._
import scala.Some
import scala.collection.mutable

/**
* Object for converting an RDD containing FASTA sequence data into ADAM FASTA data.
*/
private[adam] object FastaConverter {

case class FastaDescriptionLine(fileIndex: Long = -1L, seqId: Int = 0, descriptionLine: Option[String] = None) {
/**
* Case class that describes a line in FASTA that begins with a ">".
*
* In FASTA, a sequence starts with a line that begins with a ">" and that
* gives the sequence name, and optionally, miscellaneous information about
* the sequence. If the file contains a single line, this description line
* can be omitted.
*
* @param fileIndex The line number where this line was seen in the file.
* @param seqId The index of this sequence in the file.
* @param descriptionLine An optional string that describes the FASTA line.
*/
case class FastaDescriptionLine(fileIndex: Long = -1L,
seqId: Int = 0,
descriptionLine: Option[String] = None) {
/**
* The contig name and description that was parsed out of this description line.
*/
val (contigName, contigDescription) = parseDescriptionLine(descriptionLine, fileIndex)

private def parseDescriptionLine(descriptionLine: Option[String], id: Long): (Option[String], Option[String]) = {
/**
* Parses the text of a given line.
*
* Assumes that the line contains the contig name followed by an optional
* description of the contig, with the two separated by a space.
*
* @throws IllegalArgumentException if there is no name in the line and the
* line is not the only record in a file (i.e., the file contains multiple
* contigs).
*
* @param descriptionLine The optional string describing the contig. If this
* is not set and this isn't the only line in the file, we throw.
* @param id The index of this contig in the file.
* @return Returns a tuple containing (the optional contig name, and the
* optional contig description).
*/
private def parseDescriptionLine(descriptionLine: Option[String],
id: Long): (Option[String], Option[String]) = {
descriptionLine.fold {
assert(id == -1L, "Cannot have a headerless line in a file with more than one fragment.")
require(id == -1L, "Cannot have a headerless line in a file with more than one fragment.")
(None: Option[String], None: Option[String])
} { (dL) =>
val splitIndex = dL.indexOf(' ')
Expand Down Expand Up @@ -106,18 +137,41 @@ private[adam] object FastaConverter {
descriptionLine.contigDescription
)
}

}

/**
* Cleans up a sequence by stripping asterisks at the end of the sequence.
*
* To be consistent with a legacy database, some FASTA sequences end in a "*"
* suffix. This method strips that suffix from the end of the sequence.
*
* @param sequence The sequence to clean.
* @return Sequence minus "*" suffix.
*/
private def cleanSequence(sequence: String): String = {
sequence.stripSuffix("*")
}

/**
* A FASTA line starting with ">" is a description line.
*
* @param line The line to check.
* @return True if the line starts with ">" and is thus a description line.
*/
private def isDescriptionLine(line: String): Boolean = {
line.startsWith(">")
}

def getDescriptionLines(rdd: RDD[(Long, String)]): Map[Long, FastaDescriptionLine] = {
/**
* Gets the description lines in a FASTA file.
*
* Filters an input RDD that contains (line number, line) pairs and returns
* all lines that are descriptions of a sequence.
*
* @param rdd RDD of (line number, line string) pairs to filter.
* @return Returns a map that maps sequence IDs to description lines.
*/
private[converters] def getDescriptionLines(rdd: RDD[(Long, String)]): Map[Long, FastaDescriptionLine] = {

rdd.filter(kv => isDescriptionLine(kv._2))
.collect()
Expand All @@ -126,7 +180,19 @@ private[adam] object FastaConverter {
.toMap
}

def findContigIndex(rowIdx: Long, indices: List[Long]): Long = {
/**
* Finds the index of a contig.
*
* The index of a contig is the highest index below the index of our row.
* Here, we define the index as the row number of the description line that
* describes this contig.
*
* @param rowIdx The row number of the contig row to check.
* @param indices A list containing the row numbers of all description lines.
* @return Returns the row index of the description line that describes this
* sequence line.
*/
private[converters] def findContigIndex(rowIdx: Long, indices: List[Long]): Long = {
val idx = indices.filter(_ <= rowIdx)
idx.max
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,45 @@ package org.bdgenomics.adam.converters

import htsjdk.samtools.ValidationStringency
import org.apache.hadoop.io.Text
import org.bdgenomics.utils.misc.Logging
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.formats.avro.{
AlignmentRecord,
Fragment
}
import org.bdgenomics.utils.misc.Logging

class FastqRecordConverter extends Serializable with Logging {
/**
* Utility class for converting FASTQ formatted data.
*
* FASTQ format is:
*
* {{{
* @readName
* sequence
* +<optional readname>
* ASCII quality scores
* }}}
*/
private[adam] class FastqRecordConverter extends Serializable with Logging {

/**
* Converts a read pair in FASTQ format into two AlignmentRecords.
*
* Used for processing a single fragment of paired end sequencing data stored
* in interleaved FASTQ. While interleaved FASTQ is not an "official" format,
* it is relatively common in the wild. As the name implies, the reads from
* a single sequencing fragment are interleaved in a single file, and are not
* split across two files.
*
* @param element Key-value pair of (void, and the FASTQ text). The text
* should correspond to exactly two records.
* @return Returns a length = 2 iterable of AlignmentRecords.
*
* @throws IllegalArgumentException Throws if records are misformatted. Each
* record must be 4 lines, and sequence and quality must be the same length.
*
* @see convertFragment
*/
def convertPair(element: (Void, Text)): Iterable[AlignmentRecord] = {
val lines = element._2.toString.split('\n')
require(lines.length == 8, "Record has wrong format:\n" + element._2.toString)
Expand Down Expand Up @@ -83,6 +113,18 @@ class FastqRecordConverter extends Serializable with Logging {
)
}

/**
* Converts a read pair in FASTQ format into a Fragment.
*
* @param element Key-value pair of (void, and the FASTQ text). The text
* should correspond to exactly two records.
* @return Returns a single Fragment containing two reads..
*
* @throws IllegalArgumentException Throws if records are misformatted. Each
* record must be 4 lines, and sequence and quality must be the same length.
*
* @see convertPair
*/
def convertFragment(element: (Void, Text)): Fragment = {
val lines = element._2.toString.split('\n')
require(lines.length == 8, "Record has wrong format:\n" + element._2.toString)
Expand Down Expand Up @@ -127,6 +169,24 @@ class FastqRecordConverter extends Serializable with Logging {
.build()
}

/**
* Converts a single FASTQ read into ADAM format.
*
* Used for processing a single fragment of paired end sequencing data stored
* in interleaved FASTQ. While interleaved FASTQ is not an "official" format,
* it is relatively common in the wild. As the name implies, the reads from
* a single sequencing fragment are interleaved in a single file, and are not
* split across two files.
*
* @param element Key-value pair of (void, and the FASTQ text). The text
* should correspond to exactly two records.
* @return Returns a length = 2 iterable of AlignmentRecords.
*
* @throws IllegalArgumentException Throws if records are misformatted. Each
* record must be 4 lines, and sequence and quality must be the same length.
*
* @see convertFragment
*/
def convertRead(
element: (Void, Text),
recordGroupOpt: Option[String] = None,
Expand Down
Loading