Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mango using Partitioned parquet ADAM #358

Closed
wants to merge 12 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,9 @@ class VizReadsArgs extends Args4jBase with ParquetArgs {
@Args4jOption(required = false, name = "-preload", usage = "Chromosomes to prefetch, separated by commas (,).")
var preload: String = null

@Args4jOption(required = false, name = "-parquetIsBinned", usage = "This turns on binned parquet pre-fetch warmup step")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

var parquetIsBinned: Boolean = false

}

class VizServlet extends ScalatraServlet {
Expand Down Expand Up @@ -667,6 +670,11 @@ class VizReads(protected val args: VizReadsArgs) extends BDGSparkCommand[VizRead
VizReads.genes = Some(args.genePath)
}

// initialize binned parquet by doing a small query to force warm-up
if (args.parquetIsBinned) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to foreach(readsFile -> if isPartitioned(readsFile) then VizReads.materializer.getReads()....

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or if one of them is partitioned

VizReads.readsCache = VizReads.materializer.getReads().get.getJson(ReferenceRegion(VizReads.materializer.getReads().get.getDictionary.records.head.name, 2L, 100L))
}

// start server
if (!args.testMode) {
if (args.debugFrontend)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import org.apache.parquet.filter2.predicate.FilterPredicate
import org.apache.parquet.io.api.Binary
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.{ SequenceDictionary, ReferenceRegion }
import org.bdgenomics.adam.models.{ ReferenceRegion, SequenceDictionary }
import org.bdgenomics.adam.projections.{ AlignmentRecordField, Projection }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD
import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, DatasetBoundAlignmentRecordRDD }
import org.bdgenomics.formats.avro.AlignmentRecord
import org.bdgenomics.mango.converters.GA4GHConverter
import org.bdgenomics.mango.layout.PositionCount
Expand All @@ -38,11 +38,13 @@ import org.bdgenomics.utils.misc.Logging
import org.bdgenomics.utils.instrumentation.Metrics
import org.ga4gh.{ GAReadAlignment, GASearchReadsResponse }
import net.liftweb.json.Serialization._
import org.apache.spark.sql.Dataset
import org.bdgenomics.adam.sql
import org.seqdoop.hadoop_bam.util.SAMHeaderReader

import scala.collection.JavaConversions._
import scala.reflect._
import scala.collection.JavaConverters._

import scala.reflect.ClassTag

// metric variables
Expand Down Expand Up @@ -150,6 +152,10 @@ object AlignmentRecordMaterialization extends Logging {

val name = "AlignmentRecord"

// caches the first steps of loading binned dataset from files to avoid repeating the
// several minutes long initalization of these binned dataset
val datasetCache = new collection.mutable.HashMap[String, AlignmentRecordRDD]

/**
* Loads alignment data from bam, sam and ADAM file formats
* @param sc SparkContext
Expand Down Expand Up @@ -213,20 +219,75 @@ object AlignmentRecordMaterialization extends Logging {
* @return RDD of data from the file over specified ReferenceRegion
*/
def loadAdam(sc: SparkContext, fp: String, regions: Option[Iterable[ReferenceRegion]]): AlignmentRecordRDD = {

AlignmentTimers.loadADAMData.time {
val pred =
if (regions.isDefined) {
val prefixRegions: Iterable[ReferenceRegion] = regions.get.map(r => LazyMaterialization.getContigPredicate(r)).flatten
Some(ResourceUtils.formReferenceRegionPredicate(prefixRegions) && (BooleanColumn("readMapped") === true))
} else {
Some((BooleanColumn("readMapped") === true))
val alignmentRecordRDD = if (sc.isPartitioned(fp)) {
// finalRegions includes contigs both with and without "chr" prefix
val finalRegions: Iterable[ReferenceRegion] = regions.get ++ regions.get
.map(x => ReferenceRegion(x.referenceName.replaceFirst("""^chr""", """"""),
x.start,
x.end,
x.strand))

// load new dataset or retrieve from cache
val data: AlignmentRecordRDD = datasetCache.get(fp) match {
case Some(ds) => {
println("##### Found dataset in Mango Cache")
ds
}
case _ => {
println("##### Did NOOOOOt find dataset in Mango Cache")
datasetCache(fp) = sc.loadPartitionedParquetAlignments(fp)
datasetCache(fp)
}
}

val data2: DatasetBoundAlignmentRecordRDD = DatasetBoundAlignmentRecordRDD(data.dataset,
data.sequences,
data.recordGroups,
data.processingSteps)

/*val partitionedResult = if (regions != None) {
//data.f
data.filterByOverlappingRegions(finalRegions)
.transformDataset(d => d.filter(x => (x.readMapped.getOrElse(false)) && x.mapq.getOrElse(0) > 0))
*/
val partitionedResult = regions match {
case Some(x) => {
data2.filterDatasetByOverlappingRegions(finalRegions)
.transformDataset(d => d.filter(x => (x.readMapped.getOrElse(false)) && x.mapq.getOrElse(0) > 0))

//val test: AlignmentRecordRDD = data.transformDataset((d: Dataset[sql.AlignmentRecord]) => d.filter(sc.referenceRegionsToDatasetQueryString(finalRegions)))

//data.transformDataset((d: Dataset[org.bdgenomics.adam.sql.AlignmentRecord]) => d.filter(sc.referenceRegionsToDatasetQueryString(finalRegions)).filter((x: sql.AlignmentRecord) => (x.readMapped.getOrElse(false)) && x.mapq.getOrElse(0) > 0))

//data.filter(sc.referenceRegionsToDatasetQueryString(finalRegions))
//.filter(x => (x.readMapped.getOrElse(false)) && x.mapq.getOrElse(0) > 0)
}
case _ => { data2.transformDataset(d => d.filter(x => (x.readMapped.getOrElse(false)) && x.mapq.getOrElse(0) > 0)) }

/* } else {
data.transformDataset(d => d.filter(x => (x.readMapped.getOrElse(false)) && x.mapq.getOrElse(0) > 0))*/
}
partitionedResult
} else { // data was not written as partitioned parquet
val pred =
if (regions.isDefined) {
val prefixRegions: Iterable[ReferenceRegion] = regions.get.map(r => LazyMaterialization.getContigPredicate(r)).flatten
Some(ResourceUtils.formReferenceRegionPredicate(prefixRegions) && (BooleanColumn("readMapped") === true) && (IntColumn("mapq") > 0))
} else {
Some((BooleanColumn("readMapped") === true) && (IntColumn("mapq") > 0))
}

val proj = Projection(AlignmentRecordField.contigName, AlignmentRecordField.mapq, AlignmentRecordField.readName,
AlignmentRecordField.start, AlignmentRecordField.readMapped, AlignmentRecordField.recordGroupName,
AlignmentRecordField.end, AlignmentRecordField.sequence, AlignmentRecordField.cigar, AlignmentRecordField.readNegativeStrand,
AlignmentRecordField.readPaired, AlignmentRecordField.recordGroupSample)
sc.loadParquetAlignments(fp, optPredicate = pred, optProjection = Some(proj))
val proj = Projection(AlignmentRecordField.contigName, AlignmentRecordField.mapq, AlignmentRecordField.readName,
AlignmentRecordField.start, AlignmentRecordField.readMapped, AlignmentRecordField.recordGroupName,
AlignmentRecordField.end, AlignmentRecordField.sequence, AlignmentRecordField.cigar, AlignmentRecordField.readNegativeStrand,
AlignmentRecordField.readPaired, AlignmentRecordField.recordGroupSample)

val unpartitionedResult = sc.loadParquetAlignments(fp, optPredicate = pred, optProjection = Some(proj))
unpartitionedResult
}
return alignmentRecordRDD
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.{ ReferenceRegion, SequenceDictionary }
import org.bdgenomics.adam.projections.{ Projection, VariantField }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.variant.{ VariantContextRDD }
import org.bdgenomics.adam.rdd.variant.{ GenotypeRDD, VariantContextRDD }
import org.bdgenomics.formats.avro.{ Variant, GenotypeAllele }
import org.bdgenomics.mango.core.util.ResourceUtils
import org.bdgenomics.mango.layout.GenotypeJson
Expand Down Expand Up @@ -145,6 +145,7 @@ class VariantContextMaterialization(@transient sc: SparkContext,
object VariantContextMaterialization {

val name = "VariantContext"
val datasetCache = new collection.mutable.HashMap[String, GenotypeRDD]

/**
* Loads variant data from adam and vcf files into a VariantContextRDD
Expand Down Expand Up @@ -197,15 +198,33 @@ object VariantContextMaterialization {
* @return VariantContextRDD
*/
def loadAdam(sc: SparkContext, fp: String, regions: Option[Iterable[ReferenceRegion]]): VariantContextRDD = {
val pred =
if (regions.isDefined) {
val prefixRegions: Iterable[ReferenceRegion] = regions.get.map(r => LazyMaterialization.getContigPredicate(r)).flatten
Some(ResourceUtils.formReferenceRegionPredicate(prefixRegions))
} else {
None
if (sc.isPartitioned(fp)) {
val x: GenotypeRDD = datasetCache.get(fp) match {
case Some(x) => x.transformDataset(d => regions match {
case Some(regions) => d.filter(sc.referenceRegionsToDatasetQueryString(regions))
case _ => d
})
case _ => {
val loadedDataset = sc.loadPartitionedParquetGenotypes(fp)
datasetCache(fp) = loadedDataset
loadedDataset.transformDataset(d => regions match {
case Some(regions) => d.filter(sc.referenceRegionsToDatasetQueryString(regions))
case _ => d
})
}
}
return x.toVariantContexts()

sc.loadParquetGenotypes(fp, optPredicate = pred).toVariantContexts
} else {
val pred =
if (regions.isDefined) {
val prefixRegions: Iterable[ReferenceRegion] = regions.get.map(r => LazyMaterialization.getContigPredicate(r)).flatten
Some(ResourceUtils.formReferenceRegionPredicate(prefixRegions))
} else {
None
}
sc.loadParquetGenotypes(fp, optPredicate = pred).toVariantContexts
}
}

/**
Expand Down