Skip to content

Commit

Permalink
Merge f745bca into 9ea870c
Browse files Browse the repository at this point in the history
  • Loading branch information
fnothaft authored Mar 10, 2018
2 parents 9ea870c + f745bca commit 14d75a9
Show file tree
Hide file tree
Showing 21 changed files with 549 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ class TransformAlignmentsArgs extends Args4jBase with ADAMSaveAnyArgs with Parqu
var storageLevel: String = "MEMORY_ONLY"
@Args4jOption(required = false, name = "-disable_pg", usage = "Disable writing a new @PG line.")
var disableProcessingStep = false
@Args4jOption(required = false, name = "-partition_by_start_pos", usage = "Save the data partitioned by genomic range bins based on start pos using Hive-style partitioning.")
var partitionByStartPos: Boolean = false
@Args4jOption(required = false, name = "-partition_bin_size", usage = "Partition bin size used in Hive-style partitioning. Defaults to 1Mbp (1,000,000) base pairs).")
var partitionedBinSize = 1000000

var command: String = null
}
Expand Down Expand Up @@ -562,8 +566,15 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
mergedSd
}

outputRdd.save(args,
isSorted = args.sortReads || args.sortLexicographically)
if (args.partitionByStartPos) {
if (outputRdd.sequences.isEmpty) {
log.warn("This dataset is not aligned and therefore will not benefit from being saved as a partitioned dataset")
}
outputRdd.saveAsPartitionedParquet(args.outputPath, partitionSize = args.partitionedBinSize)
} else {
outputRdd.save(args,
isSorted = args.sortReads || args.sortLexicographically)
}
}

private def createKnownSnpsTable(sc: SparkContext): SnpTable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ class TransformGenotypesArgs extends Args4jBase with ADAMSaveAnyArgs with Parque
@Args4jOption(required = false, name = "-stringency", usage = "Stringency level for various checks; can be SILENT, LENIENT, or STRICT. Defaults to STRICT.")
var stringency: String = "STRICT"

@Args4jOption(required = false, name = "-partition_by_start_pos", usage = "Save the data partitioned by genomic range bins based on start pos using Hive-style partitioning.")
var partitionByStartPos: Boolean = false

@Args4jOption(required = false, name = "-partition_bin_size", usage = "Partition bin size used in Hive-style partitioning. Defaults to 1Mbp (1,000,000) base pairs).")
var partitionedBinSize = 1000000

// must be defined due to ADAMSaveAnyArgs, but unused here
var sortFastqOutput: Boolean = false
}
Expand Down Expand Up @@ -135,7 +141,12 @@ class TransformGenotypes(val args: TransformGenotypesArgs)
if (args.outputPath.endsWith(".vcf")) {
maybeSort(maybeCoalesce(genotypes.toVariantContexts)).saveAsVcf(args)
} else {
maybeSort(maybeCoalesce(genotypes)).saveAsParquet(args)
if (args.partitionByStartPos) {
maybeSort(maybeCoalesce(genotypes)).saveAsPartitionedParquet(args.outputPath, partitionSize = args.partitionedBinSize)
} else {
maybeSort(maybeCoalesce(genotypes)).saveAsParquet(args)
}

}
}
}
185 changes: 185 additions & 0 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1867,6 +1867,40 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into an AlignmentRecordRDD.
*
* @note The sequence dictionary is read from an Avro file stored at
* pathName/_seqdict.avro and the record group dictionary is read from an
* Avro file stored at pathName/_rgdict.avro. These files are pure Avro,
* not Parquet + Avro.
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @param optLookbackPartitions Number of partitions to lookback to find beginning of an overlapping
* region when using the filterByOverlappingRegions function on the returned dataset.
* Defaults to one partition.
* @return Returns an AlignmentRecordRDD.
*/
def loadPartitionedParquetAlignments(pathName: String,
regions: Iterable[ReferenceRegion] = Iterable.empty,
optLookbackPartitions: Option[Int] = Some(1)): AlignmentRecordRDD = {

val partitionBinSize = getPartitionBinSize(pathName)
val reads = loadParquetAlignments(pathName)
val alignmentsDatasetBound = DatasetBoundAlignmentRecordRDD(reads.dataset,
reads.sequences,
reads.recordGroups,
reads.processingSteps,
true,
Some(partitionBinSize),
optLookbackPartitions
)

if (regions.nonEmpty) alignmentsDatasetBound.filterByOverlappingRegions(regions) else alignmentsDatasetBound
}

/**
* Load unaligned alignment records from interleaved FASTQ into an AlignmentRecordRDD.
*
Expand Down Expand Up @@ -2250,6 +2284,35 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into GenotypeRDD
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @param optLookbackPartitions Number of partitions to lookback to find beginning of an overlapping
* region when using the filterByOverlappingRegions function on the returned dataset.
* Defaults to one partition.
* @return Returns a GenotypeRDD.
*/
def loadPartitionedParquetGenotypes(pathName: String,
regions: Iterable[ReferenceRegion] = Iterable.empty,
optLookbackPartitions: Option[Int] = Some(1)): GenotypeRDD = {

val partitionedBinSize = getPartitionBinSize(pathName)
val genotypes = loadParquetGenotypes(pathName)
val genotypesDatasetBound = DatasetBoundGenotypeRDD(genotypes.dataset,
genotypes.sequences,
genotypes.samples,
genotypes.headerLines,
true,
Some(partitionedBinSize),
optLookbackPartitions
)

if (regions.nonEmpty) genotypesDatasetBound.filterByOverlappingRegions(regions) else genotypesDatasetBound
}

/**
* Load a path name in Parquet + Avro format into a VariantRDD.
*
Expand Down Expand Up @@ -2283,6 +2346,34 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into an VariantRDD.
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @param optLookbackPartitions Number of partitions to lookback to find beginning of an overlapping
* region when using the filterByOverlappingRegions function on the returned dataset.
* Defaults to one partition.
* @return Returns a VariantRDD
*/
def loadPartitionedParquetVariants(pathName: String,
regions: Iterable[ReferenceRegion] = Iterable.empty,
optLookbackPartitions: Option[Int] = Some(1)): VariantRDD = {

val partitionedBinSize = getPartitionBinSize(pathName)
val variants = loadParquetVariants(pathName)
val variantsDatsetBound = DatasetBoundVariantRDD(variants.dataset,
variants.sequences,
variants.headerLines,
true,
Some(partitionedBinSize),
optLookbackPartitions
)

if (regions.nonEmpty) variantsDatsetBound.filterByOverlappingRegions(regions) else variantsDatsetBound
}

/**
* Load nucleotide contig fragments from FASTA into a NucleotideContigFragmentRDD.
*
Expand Down Expand Up @@ -2599,6 +2690,33 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into a FeatureRDD.
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @param optLookbackPartitions Number of partitions to lookback to find beginning of an overlapping
* region when using the filterByOverlappingRegions function on the returned dataset.
* Defaults to one partition.
* @return Returns a FeatureRDD.
*/
def loadPartitionedParquetFeatures(pathName: String,
regions: Iterable[ReferenceRegion] = Iterable.empty,
optLookbackPartitions: Option[Int] = Some(1)): FeatureRDD = {

val partitionedBinSize = getPartitionBinSize(pathName)
val features = loadParquetFeatures(pathName)
val featureDatasetBound = DatasetBoundFeatureRDD(features.dataset,
features.sequences,
true,
Some(partitionedBinSize),
optLookbackPartitions
)

if (regions.nonEmpty) featureDatasetBound.filterByOverlappingRegions(regions) else featureDatasetBound
}

/**
* Load a path name in Parquet + Avro format into a NucleotideContigFragmentRDD.
*
Expand Down Expand Up @@ -2631,6 +2749,33 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into a NucleotideContigFragmentRDD.
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @param optLookbackPartitions Number of partitions to lookback to find beginning of an overlapping
* region when using the filterByOverlappingRegions function on the returned dataset.
* Defaults to one partition.
* @return Returns a NucleotideContigFragmentRDD
*/
def loadPartitionedParquetContigFragments(pathName: String,
regions: Iterable[ReferenceRegion] = Iterable.empty,
optLookbackPartitions: Option[Int] = Some(1)): NucleotideContigFragmentRDD = {

val partitionedBinSize = getPartitionBinSize(pathName)
val contigs = loadParquetContigFragments(pathName)
val contigsDatasetBound = DatasetBoundNucleotideContigFragmentRDD(contigs.dataset,
contigs.sequences,
true,
Some(partitionedBinSize),
optLookbackPartitions
)

if (regions.nonEmpty) contigsDatasetBound.filterByOverlappingRegions(regions) else contigsDatasetBound
}

/**
* Load a path name in Parquet + Avro format into a FragmentRDD.
*
Expand Down Expand Up @@ -3052,4 +3197,44 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
loadParquetFragments(pathName, optPredicate = optPredicate, optProjection = optProjection)
}
}

/**
* Return length of partitions in base pairs if the specified path of Parquet + Avro files is partitioned.
*
* @param pathName Path in which to look for partitioned flag.
* @return Return length of partitions in base pairs if file is partitioned
*
* If a glob is used, all directories within the blog must be partitioned, and must have been saved
* using the same partitioned bin size. Behavior is undefined if this requirement is not met.
*/
private def getPartitionBinSize(pathName: String): Int = {

val partitionSizes = getFsAndFilesWithFilter(pathName, new FileFilter("_partitionedByStartPos")).map(f => {
val is = f.getFileSystem(sc.hadoopConfiguration).open(f)
val partitionSize = is.readInt
is.close()
partitionSize
}).toSet

require(partitionSizes.nonEmpty, "Input Parquet files (%s) are not partitioned.".format(pathName))
require(partitionSizes.size == 1, "Found multiple partition sizes (%s).".format(partitionSizes.mkString(", ")))
partitionSizes.head
}

/**
* Return true if the specified path of Parquet + Avro files is partitioned.
*
* @param pathName Path in which to look for partitioned flag.
* @return Return true if the specified path of Parquet + Avro files is partitioned.
* Behavior is undefined if some paths in glob contain _partitionedByStartPos flag file and some do not.
*/
def isPartitioned(pathName: String): Boolean = {
try {
getPartitionBinSize(pathName)
true
} catch {
case e: FileNotFoundException => false
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ import htsjdk.variant.vcf.{
VCFHeaderLineType
}
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{ FSDataOutputStream, FileSystem, Path }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.SparkFiles
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.function.{ Function => JFunction, Function2 }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{ DataFrame, Dataset, SQLContext }
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.models.{
Expand All @@ -62,6 +63,7 @@ import scala.collection.JavaConversions._
import scala.math.min
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
import scala.util.Try

private[rdd] class JavaSaveArgs(var outputPath: String,
var blockSize: Int = 128 * 1024 * 1024,
Expand Down Expand Up @@ -2204,6 +2206,35 @@ trait DatasetBoundGenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]]
transformDataset(_.unpersist())
}

val isPartitioned: Boolean
val optPartitionBinSize: Option[Int]
val optLookbackPartitions: Option[Int]

private def referenceRegionsToDatasetQueryString(regions: Iterable[ReferenceRegion]): String = {

if (Try(dataset("positionBin")).isSuccess) {
regions.map(r =>
s"(contigName=\'${r.referenceName}\' and positionBin >= " +
s"\'${(scala.math.floor(r.start / optPartitionBinSize.get).toInt) - optLookbackPartitions.get}\'" +
s" and positionBin < \'${(scala.math.floor(r.end / optPartitionBinSize.get).toInt + 1)}\'" +
s" and (end > ${r.start} and start < ${r.end}))"
).mkString(" or ")
} else { // if no positionBin field is found then construct query without bin optimization
regions.map(r =>
s"(contigName=\'${r.referenceName} \' " +
s"and (end > ${r.start} and start < ${r.end}))")
.mkString(" or ")
}
}

override def filterByOverlappingRegions(querys: Iterable[ReferenceRegion]): V = {
if (isPartitioned) {
transformDataset((d: Dataset[U]) =>
d.filter(referenceRegionsToDatasetQueryString(querys)))
} else {
super[GenomicDataset].filterByOverlappingRegions(querys)
}
}
}

/**
Expand Down Expand Up @@ -2875,4 +2906,43 @@ abstract class AvroGenomicRDD[T <% IndexedRecord: Manifest, U <: Product, V <: A
def saveAsParquet(filePath: java.lang.String) {
saveAsParquet(new JavaSaveArgs(filePath))
}

/**
* Save partition size into the partitioned Parquet flag file.
*
* @param filePath Path to save the file at.
* @param partitionSize Partition bin size, in base pairs, used in Hive-style partitioning.
*
*/
private def writePartitionedParquetFlag(filePath: String, partitionSize: Int): Unit = {
val path = new Path(filePath, "_partitionedByStartPos")
val fs: FileSystem = path.getFileSystem(rdd.context.hadoopConfiguration)
val f = fs.create(path)
f.writeInt(partitionSize)
f.close()
}

/**
* Saves this RDD to disk in range binned partitioned Parquet + Avro format
*
* @param filePath Path to save the file at.
* @param compressCodec Name of the compression codec to use.
* @param partitionSize size of partitions used when writing parquet, in base pairs. Defaults to 1000000.
*/
def saveAsPartitionedParquet(filePath: String,
compressCodec: CompressionCodecName = CompressionCodecName.GZIP,
partitionSize: Int = 1000000) {
log.warn("Saving directly as Hive-partitioned Parquet from SQL. " +
"Options other than compression codec are ignored.")
val df = toDF()
df.withColumn("positionBin", floor(df("start") / partitionSize))
.write
.partitionBy("contigName", "positionBin")
.format("parquet")
.option("spark.sql.parquet.compression.codec", compressCodec.toString.toLowerCase())
.save(filePath)
writePartitionedParquetFlag(filePath, partitionSize)
saveMetadata(filePath)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,10 @@ case class ParquetUnboundNucleotideContigFragmentRDD private[rdd] (

case class DatasetBoundNucleotideContigFragmentRDD private[rdd] (
dataset: Dataset[NucleotideContigFragmentProduct],
sequences: SequenceDictionary) extends NucleotideContigFragmentRDD
sequences: SequenceDictionary,
override val isPartitioned: Boolean = true,
override val optPartitionBinSize: Option[Int] = Some(1000000),
override val optLookbackPartitions: Option[Int] = Some(1)) extends NucleotideContigFragmentRDD
with DatasetBoundGenomicDataset[NucleotideContigFragment, NucleotideContigFragmentProduct, NucleotideContigFragmentRDD] {

lazy val rdd: RDD[NucleotideContigFragment] = dataset.rdd.map(_.toAvro)
Expand Down
Loading

0 comments on commit 14d75a9

Please sign in to comment.