Skip to content

Commit

Permalink
removed sd option from loadVcf and added indexing on multiple regions…
Browse files Browse the repository at this point in the history
… for indexed bam and vcf files
  • Loading branch information
akmorrow13 committed Aug 4, 2016
1 parent c23aace commit 2544a3d
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,7 @@ class Vcf2ADAM(val args: Vcf2ADAMArgs) extends BDGSparkCommand[Vcf2ADAMArgs] wit

def run(sc: SparkContext) {

var dictionary: Option[SequenceDictionary] = loadSequenceDictionary(args.dictionaryFile)
if (dictionary.isDefined)
log.info("Using contig translation")

val variantContextRdd = sc.loadVcf(args.vcfPath, sdOpt = dictionary)
val variantContextRdd = sc.loadVcf(args.vcfPath)
var variantContextsToSave = if (args.coalesce > 0) {
if (args.coalesce > variantContextRdd.rdd.partitions.size || args.forceShuffle) {
variantContextRdd.transform(_.coalesce(args.coalesce, shuffle = true))
Expand Down
106 changes: 77 additions & 29 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
*/
package org.bdgenomics.adam.rdd

import collection.JavaConverters._
import java.io.{ File, FileNotFoundException, InputStream }
import java.util.regex.Pattern
import com.google.common.collect.ImmutableList
import htsjdk.samtools.{ SAMFileHeader, ValidationStringency }
import htsjdk.samtools.util.Locatable
import htsjdk.samtools.util.{ Interval, Locatable }
import htsjdk.variant.vcf.VCFHeader
import org.apache.avro.Schema
import org.apache.avro.file.DataFileStream
Expand Down Expand Up @@ -354,6 +356,18 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* @param viewRegion The ReferenceRegion we are filtering on
*/
def loadIndexedBam(filePath: String, viewRegion: ReferenceRegion): AlignmentRecordRDD = {
loadIndexedBam(filePath, Iterable(viewRegion))
}

/**
* Functions like loadBam, but uses bam index files to look at fewer blocks,
* and only returns records within the specified ReferenceRegions. Bam index file required.
*
* @param filePath The path to the input data. Currently this path must correspond to
* a single Bam file. The bam index file associated needs to have the same name.
* @param viewRegions Iterable of ReferenceRegions we are filtering on
*/
def loadIndexedBam(filePath: String, viewRegions: Iterable[ReferenceRegion])(implicit s: DummyImplicit): AlignmentRecordRDD = {
val path = new Path(filePath)
val fs = FileSystem.get(path.toUri, sc.hadoopConfiguration)
assert(!fs.isDirectory(path))
Expand Down Expand Up @@ -386,7 +400,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log

val job = HadoopUtil.newJob(sc)
val conf = ContextUtil.getConfiguration(job)
BAMInputFormat.setIntervals(conf, List(LocatableReferenceRegion(viewRegion)))
BAMInputFormat.setIntervals(conf, viewRegions.toList.map(r => LocatableReferenceRegion(r)))

val records = sc.newAPIHadoopFile(filePath, classOf[BAMInputFormat], classOf[LongWritable],
classOf[SAMRecordWritable], conf)
Expand All @@ -410,7 +424,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* As such, we must force the user to pass in the schema.
*
* @tparam T The type of the specific record we are loading.
* @param filename Path to load file from.
* @param filename Path to Vf file from.
* @param schema Schema of records we are loading.
* @return Returns a Seq containing the avro records.
*/
Expand Down Expand Up @@ -596,45 +610,84 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
))
}

private def readVcfRecords(filePath: String): RDD[(LongWritable, VariantContextWritable)] = {
private def readVcfRecords(filePath: String, viewRegions: Option[Iterable[ReferenceRegion]]): RDD[(LongWritable, VariantContextWritable)] = {
// load vcf data
val job = HadoopUtil.newJob(sc)
job.getConfiguration().setStrings("io.compression.codecs",
classOf[BGZFCodec].getCanonicalName(),
classOf[BGZFEnhancedGzipCodec].getCanonicalName())

val conf = ContextUtil.getConfiguration(job)
if (viewRegions.isDefined) {
val intervals: Iterable[Interval] =
viewRegions.get.map(r => {
new Interval(r.referenceName, r.start.toInt, r.end.toInt)
})
VCFInputFormat.setIntervals(conf, ImmutableList.copyOf[Interval](intervals.asJava))
}

sc.newAPIHadoopFile(
filePath,
classOf[VCFInputFormat], classOf[LongWritable], classOf[VariantContextWritable],
ContextUtil.getConfiguration(job)
conf
)
}

/**
* Loads a VCF file into an RDD.
*
* @param filePath The file to load.
* @param sdOpt An optional sequence dictionary, in case the sequence info
* is not included in the VCF.
* @return Returns a VariantContextRDD.
*/
def loadVcf(filePath: String,
sdOpt: Option[SequenceDictionary] = None): VariantContextRDD = {
def loadVcf(filePath: String): VariantContextRDD = {

// load records from VCF
val records = readVcfRecords(filePath, None)

val vcc = new VariantContextConverter(sdOpt)
// attach instrumentation
if (Metrics.isRecording) records.instrument() else records

// load vcf metadata
val (sd, samples) = loadVcfMetadata(filePath)

val vcc = new VariantContextConverter(Some(sd))

VariantContextRDD(records.flatMap(p => vcc.convert(p._2.get)),
sd,
samples)
}

/**
* Loads a VCF file indexed by a tabix (tbi) file into an RDD.
*
* @param filePath The file to load.
* @param viewRegion ReferenceRegions we are filtering on.
* @return Returns a VariantContextRDD.
*/
def loadIndexedVcf(filePath: String,
viewRegion: ReferenceRegion): VariantContextRDD =
loadIndexedVcf(filePath, Iterable(viewRegion))

/**
* Loads a VCF file indexed by a tabix (tbi) file into an RDD.
*
* @param filePath The file to load.
* @param viewRegions Iterator of ReferenceRegions we are filtering on.
* @return Returns a VariantContextRDD.
*/
def loadIndexedVcf(filePath: String,
viewRegions: Iterable[ReferenceRegion])(implicit s: DummyImplicit): VariantContextRDD = {

// load records from VCF
val records = readVcfRecords(filePath)
val records = readVcfRecords(filePath, Some(viewRegions))

// attach instrumentation
if (Metrics.isRecording) records.instrument() else records

// load vcf metadata
val (vcfSd, samples) = loadVcfMetadata(filePath)
val (sd, samples) = loadVcfMetadata(filePath)

// we can only replace the sequences header if the sequence info was missing on the vcf
require(sdOpt.isEmpty || vcfSd.isEmpty,
"Only one of the provided or VCF sequence dictionary can be specified.")
val sd = sdOpt.getOrElse(vcfSd)
val vcc = new VariantContextConverter(Some(sd))

VariantContextRDD(records.flatMap(p => vcc.convert(p._2.get)),
sd,
Expand Down Expand Up @@ -783,9 +836,8 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}

def loadVcfAnnotations(
filePath: String,
sd: Option[SequenceDictionary] = None): DatabaseVariantAnnotationRDD = {
loadVcf(filePath, sd).toDatabaseVariantAnnotationRDD
filePath: String): DatabaseVariantAnnotationRDD = {
loadVcf(filePath).toDatabaseVariantAnnotationRDD
}

def loadParquetVariantAnnotations(
Expand All @@ -799,14 +851,12 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log

def loadVariantAnnotations(
filePath: String,
projection: Option[Schema] = None,
sd: Option[SequenceDictionary] = None): DatabaseVariantAnnotationRDD = {
projection: Option[Schema] = None): DatabaseVariantAnnotationRDD = {
if (filePath.endsWith(".vcf")) {
log.info(s"Loading $filePath as VCF, and converting to variant annotations. Projection is ignored.")
loadVcfAnnotations(filePath, sd)
loadVcfAnnotations(filePath)
} else {
log.info(s"Loading $filePath as Parquet containing DatabaseVariantAnnotations.")
sd.foreach(sd => log.warn("Sequence dictionary for translation ignored if loading ADAM from Parquet."))
loadParquetVariantAnnotations(filePath, None, projection)
}
}
Expand Down Expand Up @@ -877,11 +927,10 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log

def loadGenotypes(
filePath: String,
projection: Option[Schema] = None,
sd: Option[SequenceDictionary] = None): GenotypeRDD = {
projection: Option[Schema] = None): GenotypeRDD = {
if (filePath.endsWith(".vcf")) {
log.info(s"Loading $filePath as VCF, and converting to Genotypes. Projection is ignored.")
loadVcf(filePath, sd).toGenotypeRDD
loadVcf(filePath).toGenotypeRDD
} else {
log.info(s"Loading $filePath as Parquet containing Genotypes. Sequence dictionary for translation is ignored.")
loadParquetGenotypes(filePath, None, projection)
Expand All @@ -890,11 +939,10 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log

def loadVariants(
filePath: String,
projection: Option[Schema] = None,
sd: Option[SequenceDictionary] = None): VariantRDD = {
projection: Option[Schema] = None): VariantRDD = {
if (filePath.endsWith(".vcf")) {
log.info(s"Loading $filePath as VCF, and converting to Variants. Projection is ignored.")
loadVcf(filePath, sd).toVariantRDD
loadVcf(filePath).toVariantRDD
} else {
log.info(s"Loading $filePath as Parquet containing Variants. Sequence dictionary for translation is ignored.")
loadParquetVariants(filePath, None, projection)
Expand Down
Binary file added adam-core/src/test/resources/bqsr1.vcf.tbi
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -278,34 +278,49 @@ class ADAMContextSuite extends ADAMFunSuite {

sparkTest("can read a gzipped .vcf file") {
val path = resourcePath("test.vcf.gz")
val vcs = sc.loadVcf(path, None)
val vcs = sc.loadVcf(path)
assert(vcs.rdd.count === 6)
}

sparkTest("can read a BGZF gzipped .vcf file with .gz file extension") {
val path = resourcePath("test.vcf.bgzf.gz")
val vcs = sc.loadVcf(path, None)
val vcs = sc.loadVcf(path)
assert(vcs.rdd.count === 6)
}

sparkTest("can read a BGZF gzipped .vcf file with .bgz file extension") {
val path = resourcePath("test.vcf.bgz")
val vcs = sc.loadVcf(path, None)
val vcs = sc.loadVcf(path)
assert(vcs.rdd.count === 6)
}

ignore("can read an uncompressed BCFv2.2 file") { // see https://github.com/samtools/htsjdk/issues/507
val path = resourcePath("test.uncompressed.bcf")
val vcs = sc.loadVcf(path, None)
val vcs = sc.loadVcf(path)
assert(vcs.rdd.count === 6)
}

ignore("can read a BGZF compressed BCFv2.2 file") { // see https://github.com/samtools/htsjdk/issues/507
val path = resourcePath("test.compressed.bcf")
val vcs = sc.loadVcf(path, None)
val vcs = sc.loadVcf(path)
assert(vcs.rdd.count === 6)
}

sparkTest("loadIndexedVcf with 1 ReferenceRegion") {
val path = resourcePath("bqsr1.vcf")
val refRegion = ReferenceRegion("22", 16097644, 16098647)
val vcs = sc.loadIndexedVcf(path, refRegion)
assert(vcs.rdd.count == 17)
}

sparkTest("loadIndexedVcf with multiple ReferenceRegions") {
val path = resourcePath("bqsr1.vcf")
val refRegion1 = ReferenceRegion("22", 16050678, 16050822)
val refRegion2 = ReferenceRegion("22", 16097644, 16098647)
val vcs = sc.loadIndexedVcf(path, Iterable(refRegion1, refRegion2))
assert(vcs.rdd.count == 23)
}

(1 to 4) foreach { testNumber =>
val inputName = "interleaved_fastq_sample%d.ifq".format(testNumber)
val path = ClassLoader.getSystemClassLoader.getResource(inputName).getFile
Expand Down Expand Up @@ -417,11 +432,19 @@ class ADAMContextSuite extends ADAMFunSuite {
assert(last.getFragmentEndPosition === 251929L)
}

sparkTest("loadIndexedBam") {
sparkTest("loadIndexedBam with 1 ReferenceRegion") {
val refRegion = ReferenceRegion("chr2", 100, 101)
val path = resourcePath("sorted.bam")
val reads = sc.loadIndexedBam(path, refRegion)
assert(reads.rdd.count == 1)
}

sparkTest("loadIndexedBam with multiple ReferenceRegions") {
val refRegion1 = ReferenceRegion("chr2", 100, 101)
val refRegion2 = ReferenceRegion("3", 10, 17)
val path = resourcePath("sorted.bam")
val reads = sc.loadIndexedBam(path, Iterable(refRegion1, refRegion2))
assert(reads.rdd.count == 2)
}
}

0 comments on commit 2544a3d

Please sign in to comment.