diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformAlignments.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformAlignments.scala index c3737ab4f1..4883ce8ecc 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformAlignments.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformAlignments.scala @@ -131,6 +131,10 @@ class TransformAlignmentsArgs extends Args4jBase with ADAMSaveAnyArgs with Parqu var storageLevel: String = "MEMORY_ONLY" @Args4jOption(required = false, name = "-disable_pg", usage = "Disable writing a new @PG line.") var disableProcessingStep = false + @Args4jOption(required = false, name = "-partition_by_start_pos", usage = "Save the data partitioned by genomic range bins based on start pos using Hive-style partitioning.") + var partitionByStartPos: Boolean = false + @Args4jOption(required = false, name = "-partition_bin_size", usage = "Partition bin size used in Hive-style partitioning. Defaults to 1Mbp (1,000,000) base pairs).") + var partitionedBinSize = 1000000 var command: String = null } @@ -562,8 +566,15 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B mergedSd } - outputRdd.save(args, - isSorted = args.sortReads || args.sortLexicographically) + if (args.partitionByStartPos) { + if (outputRdd.sequences.isEmpty) { + log.warn("This dataset is not aligned and therefore will not benefit from being saved as a partitioned dataset") + } + outputRdd.saveAsPartitionedParquet(args.outputPath, partitionSize = args.partitionedBinSize) + } else { + outputRdd.save(args, + isSorted = args.sortReads || args.sortLexicographically) + } } private def createKnownSnpsTable(sc: SparkContext): SnpTable = { diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformGenotypes.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformGenotypes.scala index 3ede094c0f..15e0df35a1 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformGenotypes.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformGenotypes.scala @@ -68,6 +68,12 @@ class TransformGenotypesArgs extends Args4jBase with ADAMSaveAnyArgs with Parque @Args4jOption(required = false, name = "-stringency", usage = "Stringency level for various checks; can be SILENT, LENIENT, or STRICT. Defaults to STRICT.") var stringency: String = "STRICT" + @Args4jOption(required = false, name = "-partition_by_start_pos", usage = "Save the data partitioned by genomic range bins based on start pos using Hive-style partitioning.") + var partitionByStartPos: Boolean = false + + @Args4jOption(required = false, name = "-partition_bin_size", usage = "Partition bin size used in Hive-style partitioning. Defaults to 1Mbp (1,000,000) base pairs).") + var partitionedBinSize = 1000000 + // must be defined due to ADAMSaveAnyArgs, but unused here var sortFastqOutput: Boolean = false } @@ -135,7 +141,12 @@ class TransformGenotypes(val args: TransformGenotypesArgs) if (args.outputPath.endsWith(".vcf")) { maybeSort(maybeCoalesce(genotypes.toVariantContexts)).saveAsVcf(args) } else { - maybeSort(maybeCoalesce(genotypes)).saveAsParquet(args) + if (args.partitionByStartPos) { + maybeSort(maybeCoalesce(genotypes)).saveAsPartitionedParquet(args.outputPath, partitionSize = args.partitionedBinSize) + } else { + maybeSort(maybeCoalesce(genotypes)).saveAsParquet(args) + } + } } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index 30f2c300a6..168bb32d79 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -1867,6 +1867,40 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + /** + * Load a path name with range binned partitioned Parquet + Avro format into an AlignmentRecordRDD. + * + * @note The sequence dictionary is read from an Avro file stored at + * pathName/_seqdict.avro and the record group dictionary is read from an + * Avro file stored at pathName/_rgdict.avro. These files are pure Avro, + * not Parquet + Avro. + * + * @param pathName The path name to load alignment records from. + * Globs/directories are supported. + * @param regions Optional list of genomic regions to load. + * @param optLookbackPartitions Number of partitions to lookback to find beginning of an overlapping + * region when using the filterByOverlappingRegions function on the returned dataset. + * Defaults to one partition. + * @return Returns an AlignmentRecordRDD. + */ + def loadPartitionedParquetAlignments(pathName: String, + regions: Iterable[ReferenceRegion] = Iterable.empty, + optLookbackPartitions: Option[Int] = Some(1)): AlignmentRecordRDD = { + + val partitionBinSize = getPartitionBinSize(pathName) + val reads = loadParquetAlignments(pathName) + val alignmentsDatasetBound = DatasetBoundAlignmentRecordRDD(reads.dataset, + reads.sequences, + reads.recordGroups, + reads.processingSteps, + true, + Some(partitionBinSize), + optLookbackPartitions + ) + + if (regions.nonEmpty) alignmentsDatasetBound.filterByOverlappingRegions(regions) else alignmentsDatasetBound + } + /** * Load unaligned alignment records from interleaved FASTQ into an AlignmentRecordRDD. * @@ -2250,6 +2284,35 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + /** + * Load a path name with range binned partitioned Parquet + Avro format into GenotypeRDD + * + * @param pathName The path name to load alignment records from. + * Globs/directories are supported. + * @param regions Optional list of genomic regions to load. + * @param optLookbackPartitions Number of partitions to lookback to find beginning of an overlapping + * region when using the filterByOverlappingRegions function on the returned dataset. + * Defaults to one partition. + * @return Returns a GenotypeRDD. + */ + def loadPartitionedParquetGenotypes(pathName: String, + regions: Iterable[ReferenceRegion] = Iterable.empty, + optLookbackPartitions: Option[Int] = Some(1)): GenotypeRDD = { + + val partitionedBinSize = getPartitionBinSize(pathName) + val genotypes = loadParquetGenotypes(pathName) + val genotypesDatasetBound = DatasetBoundGenotypeRDD(genotypes.dataset, + genotypes.sequences, + genotypes.samples, + genotypes.headerLines, + true, + Some(partitionedBinSize), + optLookbackPartitions + ) + + if (regions.nonEmpty) genotypesDatasetBound.filterByOverlappingRegions(regions) else genotypesDatasetBound + } + /** * Load a path name in Parquet + Avro format into a VariantRDD. * @@ -2283,6 +2346,34 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + /** + * Load a path name with range binned partitioned Parquet + Avro format into an VariantRDD. + * + * @param pathName The path name to load alignment records from. + * Globs/directories are supported. + * @param regions Optional list of genomic regions to load. + * @param optLookbackPartitions Number of partitions to lookback to find beginning of an overlapping + * region when using the filterByOverlappingRegions function on the returned dataset. + * Defaults to one partition. + * @return Returns a VariantRDD + */ + def loadPartitionedParquetVariants(pathName: String, + regions: Iterable[ReferenceRegion] = Iterable.empty, + optLookbackPartitions: Option[Int] = Some(1)): VariantRDD = { + + val partitionedBinSize = getPartitionBinSize(pathName) + val variants = loadParquetVariants(pathName) + val variantsDatsetBound = DatasetBoundVariantRDD(variants.dataset, + variants.sequences, + variants.headerLines, + true, + Some(partitionedBinSize), + optLookbackPartitions + ) + + if (regions.nonEmpty) variantsDatsetBound.filterByOverlappingRegions(regions) else variantsDatsetBound + } + /** * Load nucleotide contig fragments from FASTA into a NucleotideContigFragmentRDD. * @@ -2599,6 +2690,33 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + /** + * Load a path name with range binned partitioned Parquet + Avro format into a FeatureRDD. + * + * @param pathName The path name to load alignment records from. + * Globs/directories are supported. + * @param regions Optional list of genomic regions to load. + * @param optLookbackPartitions Number of partitions to lookback to find beginning of an overlapping + * region when using the filterByOverlappingRegions function on the returned dataset. + * Defaults to one partition. + * @return Returns a FeatureRDD. + */ + def loadPartitionedParquetFeatures(pathName: String, + regions: Iterable[ReferenceRegion] = Iterable.empty, + optLookbackPartitions: Option[Int] = Some(1)): FeatureRDD = { + + val partitionedBinSize = getPartitionBinSize(pathName) + val features = loadParquetFeatures(pathName) + val featureDatasetBound = DatasetBoundFeatureRDD(features.dataset, + features.sequences, + true, + Some(partitionedBinSize), + optLookbackPartitions + ) + + if (regions.nonEmpty) featureDatasetBound.filterByOverlappingRegions(regions) else featureDatasetBound + } + /** * Load a path name in Parquet + Avro format into a NucleotideContigFragmentRDD. * @@ -2631,6 +2749,33 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + /** + * Load a path name with range binned partitioned Parquet + Avro format into a NucleotideContigFragmentRDD. + * + * @param pathName The path name to load alignment records from. + * Globs/directories are supported. + * @param regions Optional list of genomic regions to load. + * @param optLookbackPartitions Number of partitions to lookback to find beginning of an overlapping + * region when using the filterByOverlappingRegions function on the returned dataset. + * Defaults to one partition. + * @return Returns a NucleotideContigFragmentRDD + */ + def loadPartitionedParquetContigFragments(pathName: String, + regions: Iterable[ReferenceRegion] = Iterable.empty, + optLookbackPartitions: Option[Int] = Some(1)): NucleotideContigFragmentRDD = { + + val partitionedBinSize = getPartitionBinSize(pathName) + val contigs = loadParquetContigFragments(pathName) + val contigsDatasetBound = DatasetBoundNucleotideContigFragmentRDD(contigs.dataset, + contigs.sequences, + true, + Some(partitionedBinSize), + optLookbackPartitions + ) + + if (regions.nonEmpty) contigsDatasetBound.filterByOverlappingRegions(regions) else contigsDatasetBound + } + /** * Load a path name in Parquet + Avro format into a FragmentRDD. * @@ -3052,4 +3197,44 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log loadParquetFragments(pathName, optPredicate = optPredicate, optProjection = optProjection) } } + + /** + * Return length of partitions in base pairs if the specified path of Parquet + Avro files is partitioned. + * + * @param pathName Path in which to look for partitioned flag. + * @return Return length of partitions in base pairs if file is partitioned + * + * If a glob is used, all directories within the blog must be partitioned, and must have been saved + * using the same partitioned bin size. Behavior is undefined if this requirement is not met. + */ + private def getPartitionBinSize(pathName: String): Int = { + + val partitionSizes = getFsAndFilesWithFilter(pathName, new FileFilter("_partitionedByStartPos")).map(f => { + val is = f.getFileSystem(sc.hadoopConfiguration).open(f) + val partitionSize = is.readInt + is.close() + partitionSize + }).toSet + + require(partitionSizes.nonEmpty, "Input Parquet files (%s) are not partitioned.".format(pathName)) + require(partitionSizes.size == 1, "Found multiple partition sizes (%s).".format(partitionSizes.mkString(", "))) + partitionSizes.head + } + + /** + * Return true if the specified path of Parquet + Avro files is partitioned. + * + * @param pathName Path in which to look for partitioned flag. + * @return Return true if the specified path of Parquet + Avro files is partitioned. + * Behavior is undefined if some paths in glob contain _partitionedByStartPos flag file and some do not. + */ + def isPartitioned(pathName: String): Boolean = { + try { + getPartitionBinSize(pathName) + true + } catch { + case e: FileNotFoundException => false + } + } + } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala index 5bde66f83f..bf5f72681a 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala @@ -28,7 +28,7 @@ import htsjdk.variant.vcf.{ VCFHeaderLineType } import org.apache.avro.generic.IndexedRecord -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{ FSDataOutputStream, FileSystem, Path } import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.SparkFiles import org.apache.spark.api.java.JavaRDD @@ -36,6 +36,7 @@ import org.apache.spark.api.java.function.{ Function => JFunction, Function2 } import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.{ DataFrame, Dataset, SQLContext } +import org.apache.spark.sql.functions._ import org.apache.spark.storage.StorageLevel import org.bdgenomics.adam.instrumentation.Timers._ import org.bdgenomics.adam.models.{ @@ -62,6 +63,7 @@ import scala.collection.JavaConversions._ import scala.math.min import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag +import scala.util.Try private[rdd] class JavaSaveArgs(var outputPath: String, var blockSize: Int = 128 * 1024 * 1024, @@ -2204,6 +2206,35 @@ trait DatasetBoundGenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] transformDataset(_.unpersist()) } + val isPartitioned: Boolean + val optPartitionBinSize: Option[Int] + val optLookbackPartitions: Option[Int] + + private def referenceRegionsToDatasetQueryString(regions: Iterable[ReferenceRegion]): String = { + + if (Try(dataset("positionBin")).isSuccess) { + regions.map(r => + s"(contigName=\'${r.referenceName}\' and positionBin >= " + + s"\'${(scala.math.floor(r.start / optPartitionBinSize.get).toInt) - optLookbackPartitions.get}\'" + + s" and positionBin < \'${(scala.math.floor(r.end / optPartitionBinSize.get).toInt + 1)}\'" + + s" and (end > ${r.start} and start < ${r.end}))" + ).mkString(" or ") + } else { // if no positionBin field is found then construct query without bin optimization + regions.map(r => + s"(contigName=\'${r.referenceName} \' " + + s"and (end > ${r.start} and start < ${r.end}))") + .mkString(" or ") + } + } + + override def filterByOverlappingRegions(querys: Iterable[ReferenceRegion]): V = { + if (isPartitioned) { + transformDataset((d: Dataset[U]) => + d.filter(referenceRegionsToDatasetQueryString(querys))) + } else { + super[GenomicDataset].filterByOverlappingRegions(querys) + } + } } /** @@ -2875,4 +2906,43 @@ abstract class AvroGenomicRDD[T <% IndexedRecord: Manifest, U <: Product, V <: A def saveAsParquet(filePath: java.lang.String) { saveAsParquet(new JavaSaveArgs(filePath)) } + + /** + * Save partition size into the partitioned Parquet flag file. + * + * @param filePath Path to save the file at. + * @param partitionSize Partition bin size, in base pairs, used in Hive-style partitioning. + * + */ + private def writePartitionedParquetFlag(filePath: String, partitionSize: Int): Unit = { + val path = new Path(filePath, "_partitionedByStartPos") + val fs: FileSystem = path.getFileSystem(rdd.context.hadoopConfiguration) + val f = fs.create(path) + f.writeInt(partitionSize) + f.close() + } + + /** + * Saves this RDD to disk in range binned partitioned Parquet + Avro format + * + * @param filePath Path to save the file at. + * @param compressCodec Name of the compression codec to use. + * @param partitionSize size of partitions used when writing parquet, in base pairs. Defaults to 1000000. + */ + def saveAsPartitionedParquet(filePath: String, + compressCodec: CompressionCodecName = CompressionCodecName.GZIP, + partitionSize: Int = 1000000) { + log.warn("Saving directly as Hive-partitioned Parquet from SQL. " + + "Options other than compression codec are ignored.") + val df = toDF() + df.withColumn("positionBin", floor(df("start") / partitionSize)) + .write + .partitionBy("contigName", "positionBin") + .format("parquet") + .option("spark.sql.parquet.compression.codec", compressCodec.toString.toLowerCase()) + .save(filePath) + writePartitionedParquetFlag(filePath, partitionSize) + saveMetadata(filePath) + } + } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala index 64020c0cc4..a904bb6207 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala @@ -137,7 +137,10 @@ case class ParquetUnboundNucleotideContigFragmentRDD private[rdd] ( case class DatasetBoundNucleotideContigFragmentRDD private[rdd] ( dataset: Dataset[NucleotideContigFragmentProduct], - sequences: SequenceDictionary) extends NucleotideContigFragmentRDD + sequences: SequenceDictionary, + override val isPartitioned: Boolean = true, + override val optPartitionBinSize: Option[Int] = Some(1000000), + override val optLookbackPartitions: Option[Int] = Some(1)) extends NucleotideContigFragmentRDD with DatasetBoundGenomicDataset[NucleotideContigFragment, NucleotideContigFragmentProduct, NucleotideContigFragmentRDD] { lazy val rdd: RDD[NucleotideContigFragment] = dataset.rdd.map(_.toAvro) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageRDD.scala index cce01e85b6..df34a25d76 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageRDD.scala @@ -103,7 +103,10 @@ case class ParquetUnboundCoverageRDD private[rdd] ( */ case class DatasetBoundCoverageRDD private[rdd] ( dataset: Dataset[Coverage], - sequences: SequenceDictionary) extends CoverageRDD + sequences: SequenceDictionary, + override val isPartitioned: Boolean = false, + override val optPartitionBinSize: Option[Int] = None, + override val optLookbackPartitions: Option[Int] = None) extends CoverageRDD with DatasetBoundGenomicDataset[Coverage, Coverage, CoverageRDD] { protected lazy val optPartitionMap = None @@ -167,8 +170,8 @@ abstract class CoverageRDD extends GenomicDataset[Coverage, Coverage, CoverageRD val mergedSequences = iterableRdds.map(_.sequences).fold(sequences)(_ ++ _) if (iterableRdds.forall(rdd => rdd match { - case DatasetBoundCoverageRDD(_, _) => true - case _ => false + case DatasetBoundCoverageRDD(_, _, _, _, _) => true + case _ => false })) { DatasetBoundCoverageRDD(iterableRdds.map(_.dataset) .fold(dataset)(_.union(_)), mergedSequences) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala index 7e5ca9f7f4..9da06f8acc 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala @@ -276,7 +276,10 @@ case class ParquetUnboundFeatureRDD private[rdd] ( case class DatasetBoundFeatureRDD private[rdd] ( dataset: Dataset[FeatureProduct], - sequences: SequenceDictionary) extends FeatureRDD + sequences: SequenceDictionary, + override val isPartitioned: Boolean = true, + override val optPartitionBinSize: Option[Int] = Some(1000000), + override val optLookbackPartitions: Option[Int] = Some(1)) extends FeatureRDD with DatasetBoundGenomicDataset[Feature, FeatureProduct, FeatureRDD] { lazy val rdd = dataset.rdd.map(_.toAvro) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala index a1f5f4f16a..ea81061869 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala @@ -187,7 +187,10 @@ case class DatasetBoundFragmentRDD private[rdd] ( dataset: Dataset[FragmentProduct], sequences: SequenceDictionary, recordGroups: RecordGroupDictionary, - @transient val processingSteps: Seq[ProcessingStep]) extends FragmentRDD + @transient val processingSteps: Seq[ProcessingStep], + override val isPartitioned: Boolean = false, + override val optPartitionBinSize: Option[Int] = None, + override val optLookbackPartitions: Option[Int] = None) extends FragmentRDD with DatasetBoundGenomicDataset[Fragment, FragmentProduct, FragmentRDD] { lazy val rdd = dataset.rdd.map(_.toAvro) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala index 84cc955fd0..41fbcf429b 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala @@ -237,7 +237,10 @@ case class DatasetBoundAlignmentRecordRDD private[rdd] ( dataset: Dataset[AlignmentRecordProduct], sequences: SequenceDictionary, recordGroups: RecordGroupDictionary, - @transient val processingSteps: Seq[ProcessingStep]) extends AlignmentRecordRDD + @transient val processingSteps: Seq[ProcessingStep], + override val isPartitioned: Boolean = true, + override val optPartitionBinSize: Option[Int] = Some(1000000), + override val optLookbackPartitions: Option[Int] = Some(1)) extends AlignmentRecordRDD with DatasetBoundGenomicDataset[AlignmentRecord, AlignmentRecordProduct, AlignmentRecordRDD] { lazy val rdd = dataset.rdd.map(_.toAvro) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala index aa508f63cb..4bcb4a7aaf 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala @@ -149,7 +149,10 @@ case class DatasetBoundGenotypeRDD private[rdd] ( dataset: Dataset[GenotypeProduct], sequences: SequenceDictionary, @transient samples: Seq[Sample], - @transient headerLines: Seq[VCFHeaderLine] = DefaultHeaderLines.allHeaderLines) extends GenotypeRDD + @transient headerLines: Seq[VCFHeaderLine] = DefaultHeaderLines.allHeaderLines, + override val isPartitioned: Boolean = true, + override val optPartitionBinSize: Option[Int] = Some(1000000), + override val optLookbackPartitions: Option[Int] = Some(1)) extends GenotypeRDD with DatasetBoundGenomicDataset[Genotype, GenotypeProduct, GenotypeRDD] { protected lazy val optPartitionMap = None diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantRDD.scala index f3444c8251..b122dac9c6 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantRDD.scala @@ -136,7 +136,10 @@ case class ParquetUnboundVariantRDD private[rdd] ( case class DatasetBoundVariantRDD private[rdd] ( dataset: Dataset[VariantProduct], sequences: SequenceDictionary, - @transient headerLines: Seq[VCFHeaderLine] = DefaultHeaderLines.allHeaderLines) extends VariantRDD + @transient headerLines: Seq[VCFHeaderLine] = DefaultHeaderLines.allHeaderLines, + override val isPartitioned: Boolean = true, + override val optPartitionBinSize: Option[Int] = Some(1000000), + override val optLookbackPartitions: Option[Int] = Some(1)) extends VariantRDD with DatasetBoundGenomicDataset[Variant, VariantProduct, VariantRDD] { protected lazy val optPartitionMap = None diff --git a/adam-core/src/test/resources/multi_chr.sam b/adam-core/src/test/resources/multi_chr.sam new file mode 100644 index 0000000000..5a8a65115b --- /dev/null +++ b/adam-core/src/test/resources/multi_chr.sam @@ -0,0 +1,7 @@ +@SQ SN:1 LN:249250621 +@SQ SN:2 LN:243199373 +@PG ID:p1 PN:myProg CL:"myProg 123" VN:1.0.0 +@PG ID:p2 PN:myProg CL:"myProg 456" VN:1.0.0 PP:p1 +simread:1:26472783:false 16 1 26472784 60 75M * 0 0 GTATAAGAGCAGCCTTATTCCTATTTATAATCAGGGTGAAACACCTGTGCCAATGCCAAGACAGGGGTGCCAAGA * NM:i:0 AS:i:75 XS:i:0 +simread:1:240997787:true 0 1 240997788 60 75M * 0 0 CTTTATTTTTATTTTTAAGGTTTTTTTTGTTTGTTTGTTTTGAGATGGAGTCTCGCTCCACCGCCCAGACTGGAG * NM:i:0 AS:i:75 XS:i:39 +simread:1:189606653:true 0 2 189606654 60 75M * 0 0 TGTATCTTCCTCCCCTGCTGTATGTTTCCTGCCCTCAAACATCACACTCCACGTTCTTCAGCTTTAGGACTTGGA * NM:i:0 AS:i:75 XS:i:0 diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDDSuite.scala index be61bf87db..df94032273 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDDSuite.scala @@ -140,6 +140,37 @@ class NucleotideContigFragmentRDDSuite extends ADAMFunSuite { assert(fragments3.dataset.count === 8L) } + sparkTest("round trip a ncf to partitioned parquet") { + def testMetadata(fRdd: NucleotideContigFragmentRDD) { + val sequenceRdd = fRdd.addSequence(SequenceRecord("aSequence", 1000L)) + assert(sequenceRdd.sequences.containsRefName("aSequence")) + } + + val fragments1 = sc.loadFasta(testFile("HLA_DQB1_05_01_01_02.fa"), 1000L) + assert(fragments1.rdd.count === 8L) + assert(fragments1.dataset.count === 8L) + testMetadata(fragments1) + + // save using dataset path + val output1 = tmpFile("ctg.adam") + val dsBound = fragments1.transformDataset(ds => ds) + testMetadata(dsBound) + dsBound.saveAsPartitionedParquet(output1) + val fragments2 = sc.loadPartitionedParquetContigFragments(output1) + testMetadata(fragments2) + assert(fragments2.rdd.count === 8L) + assert(fragments2.dataset.count === 8L) + + // save using rdd path + val output2 = tmpFile("ctg.adam") + val rddBound = fragments2.transform(rdd => rdd) + testMetadata(rddBound) + rddBound.saveAsPartitionedParquet(output2) + val fragments3 = sc.loadPartitionedParquetContigFragments(output2) + assert(fragments3.rdd.count === 8L) + assert(fragments3.dataset.count === 8L) + } + sparkTest("save fasta back as a single file") { val origFasta = testFile("artificial.fa") val tmpFasta = tmpFile("test.fa") diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/FeatureRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/FeatureRDDSuite.scala index f42cfcc6a9..556780545e 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/FeatureRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/FeatureRDDSuite.scala @@ -925,6 +925,29 @@ class FeatureRDDSuite extends ADAMFunSuite { assert(rdd3.dataset.count === 4) } + sparkTest("load partitioned parquet to sql, save, re-read from avro") { + def testMetadata(fRdd: FeatureRDD) { + val sequenceRdd = fRdd.addSequence(SequenceRecord("aSequence", 1000L)) + assert(sequenceRdd.sequences.containsRefName("aSequence")) + } + + val inputPath = testFile("small.1.bed") + val outputPath = tmpLocation() + val rrdd = sc.loadFeatures(inputPath) + testMetadata(rrdd) + val rdd = rrdd.transformDataset(ds => ds) // no-op but force to ds + testMetadata(rdd) + rdd.saveAsPartitionedParquet(outputPath) + val rdd2 = sc.loadPartitionedParquetFeatures(outputPath) + testMetadata(rdd2) + val outputPath2 = tmpLocation() + rdd.transform(rdd => rdd) // no-op but force to rdd + .saveAsPartitionedParquet(outputPath2) + val rdd3 = sc.loadPartitionedParquetFeatures(outputPath2) + assert(rdd3.rdd.count === 4) + assert(rdd3.dataset.count === 4) + } + sparkTest("transform features to contig rdd") { val features = sc.loadFeatures(testFile("sample_coverage.bed")) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala index 4225d1acd8..a4cbde4dbe 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala @@ -638,6 +638,41 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite { assert(rdd3.dataset.count === 20) } + sparkTest("load from sam, save as partitioned parquet, and re-read from partitioned parquet") { + def testMetadata(arRdd: AlignmentRecordRDD) { + val sequenceRdd = arRdd.addSequence(SequenceRecord("aSequence", 1000L)) + assert(sequenceRdd.sequences.containsRefName("aSequence")) + + val rgRdd = arRdd.addRecordGroup(RecordGroup("test", "aRg")) + assert(rgRdd.recordGroups("aRg").sample === "test") + } + + val inputPath = testFile("multi_chr.sam") + val outputPath = tmpLocation() + val rrdd = sc.loadAlignments(inputPath) + testMetadata(rrdd) + + rrdd.saveAsPartitionedParquet(outputPath, partitionSize = 1000000) + val rdd2 = sc.loadPartitionedParquetAlignments(outputPath) + testMetadata(rdd2) + assert(rdd2.rdd.count === 3) + assert(rdd2.dataset.count === 3) + + val rdd3 = sc.loadPartitionedParquetAlignments(outputPath, List(ReferenceRegion("1", 240000000L, 241000000L), ReferenceRegion("2", 189000000L, 190000000L))) + assert(rdd3.dataset.count === 2) + assert(rdd3.rdd.count === 2) + + //test explicit transform to Alignment Record Product + val rdd = rrdd.transformDataset(ds => { + import ds.sqlContext.implicits._ + val df = ds.toDF() + df.as[AlignmentRecordProduct] + }) + testMetadata(rdd) + assert(rdd.dataset.count === 3) + assert(rdd.rdd.count === 3) + } + sparkTest("save as SAM format") { val inputPath = testFile("small.sam") val reads: AlignmentRecordRDD = sc.loadAlignments(inputPath) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDDSuite.scala index ff04971089..db19e0b03d 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDDSuite.scala @@ -128,6 +128,14 @@ class GenotypeRDDSuite extends ADAMFunSuite { assert(starts(752790L)) } + sparkTest("round trip to partitioned parquet") { + val genotypes = sc.loadGenotypes(testFile("small.vcf")) + val outputPath = tmpLocation() + genotypes.saveAsPartitionedParquet(outputPath) + val unfilteredGenotypes = sc.loadPartitionedParquetGenotypes(outputPath) + assert(unfilteredGenotypes.rdd.count === 18) + } + sparkTest("use broadcast join to pull down genotypes mapped to targets") { val genotypesPath = testFile("small.vcf") val targetsPath = testFile("small.1.bed") diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/VariantRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/VariantRDDSuite.scala index 4e258ddc8b..c1957337b7 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/VariantRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/VariantRDDSuite.scala @@ -147,6 +147,21 @@ class VariantRDDSuite extends ADAMFunSuite { assert(starts(752790L)) } + sparkTest("save and reload from partitioned parquet") { + val variants = sc.loadVariants(testFile("sorted-variants.vcf")) + val outputPath = tmpLocation() + variants.saveAsPartitionedParquet(outputPath, partitionSize = 1000000) + val unfilteredVariants = sc.loadPartitionedParquetVariants(outputPath) + assert(unfilteredVariants.rdd.count === 6) + assert(unfilteredVariants.dataset.count === 6) + + val regionsVariants = sc.loadPartitionedParquetVariants(outputPath, + List(ReferenceRegion("2", 19000L, 21000L), + ReferenceRegion("13", 752700L, 752750L))) + assert(regionsVariants.rdd.count === 2) + assert(regionsVariants.dataset.count === 2) + } + sparkTest("use broadcast join to pull down variants mapped to targets") { val variantsPath = testFile("small.vcf") val targetsPath = testFile("small.1.bed") diff --git a/docs/api/adamContext.rst b/docs/api/adamContext.rst index deab7a92ca..8452d48e19 100644 --- a/docs/api/adamContext.rst +++ b/docs/api/adamContext.rst @@ -61,6 +61,7 @@ With an ``ADAMContext``, you can load: - Selected regions from an indexed BAM/CRAM using ``loadIndexedBam`` (Scala, Java, and Python) - From FASTQ using ``loadFastq``, ``loadPairedFastq``, and ``loadUnpairedFastq`` (Scala only) - From Parquet using ``loadParquetAlignments`` (Scala only) + - From partitioned Parquet using ``loadPartitionedParquetAlignments`` (Scala only) - The ``loadAlignments`` method will load from any of the above formats, and will autodetect the underlying format (Scala, Java, Python, and R, also supports loading reads from FASTA) @@ -77,11 +78,13 @@ With an ``ADAMContext``, you can load: - Genotypes as a ``GenotypeRDD``: - From Parquet using ``loadParquetGenotypes`` (Scala only) + - From partitioned Parquet using ``loadPartitionedParquetGenotypes`` (Scala only) - From either Parquet or VCF/BCF1 using ``loadGenotypes`` (Scala, Java, Python, and R) - Variants as a ``VariantRDD``: - From Parquet using ``loadParquetVariants`` (Scala only) + - From partitioned Parquet using ``loadPartitionedParquetVariants`` (Scala only) - From either Parquet or VCF/BCF1 using ``loadVariants`` (Scala, Java, Python, and R) - Genomic features as a ``FeatureRDD``: @@ -92,12 +95,14 @@ With an ``ADAMContext``, you can load: - From NarrowPeak using ``loadNarrowPeak`` (Scala only) - From IntervalList using ``loadIntervalList`` (Scala only) - From Parquet using ``loadParquetFeatures`` (Scala only) + - From partitioned Parquet using ``loadPartitionedParquetFeatures`` (Scala only) - Autodetected from any of the above using ``loadFeatures`` (Scala, Java, Python, and R) - Fragmented contig sequence as a ``NucleotideContigFragmentRDD``: - From FASTA with ``loadFasta`` (Scala only) - From Parquet with ``loadParquetContigFragments`` (Scala only) + - From partitioned Parquet with ``loadPartitionedParquetContigFragments`` (Scala only) - Autodetected from either of the above using ``loadSequences`` (Scala, Java, Python, and R) - Coverage data as a ``CoverageRDD``: diff --git a/docs/api/genomicRdd.rst b/docs/api/genomicRdd.rst index 287c0fd06a..233ed35ea8 100644 --- a/docs/api/genomicRdd.rst +++ b/docs/api/genomicRdd.rst @@ -152,3 +152,100 @@ Similar to ``transform``/``transformDataset``, there exists a ``transmuteDataset`` function that enables transformations between ``GenomicRDD``\ s of different types. +Using partitioned Parquet to speed up range based queries +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +GenomicRDDs of types ``AlignmentRecordRDD``, ``GenotypeRDD``, +``VariantRDD``, and ``NucleotideFragmentContigRDD`` can be written as Parquet +using a Hive-style hierarchical directory scheme that is based on contig and +genomic position. This partitioning reduces the latency of genomic range +queries against these datasets, which is particularly important for interactive +applications such as a genomic browser backed by an ADAM dataset. + +The genomicRDD function +``GenomicRDD.filterByOverlappingRegions(queryRegionsList)`` builds a Spark SQL +query that uses this partitioning scheme. This can reduce latencies by more +than 20x when repeatedly querying a datset with genomic range filters. +On a high coverage alignment dataset, this partitioning strategy improved +latency from 1-2 minutes to 1-3 seconds when looking up genomic ranges. + +**Saving partitioned parquet files to disk** + +A ``GenomicRDD`` can be written to disk as a partitioned Parquet dataset with the +``GenomicRDD`` function ``saveAsPartitionedParquet``. The optional +``partitionSize`` parameter defines the width in base pairs of the partitions +within each contig. + +.. code:: scala + data.saveAsPartitionedParquet("dataset1.adam", partitionSize = 2000000) + +A partitioned dataset can also be created from an input Parquet or SAM/BAM/CRAM +file using the ADAM ``transformAlignments`` CLI, or Parquet/VCF files using the +ADAM ``transformGenotypes`` CLI. + +**Loading partitioned parquet files** + +A GenomicRDD can be loaded from a partitioned Parquet dataset using the +ADAMContext function ``loadPartitionedParquet[*]`` specific to each data type +such as ``loadPartitionedParquetAlignments``. + +**Layout of Partitioned Parquet directory** + +An ADAM partitioned Parquet dataset is written as a three level directory hierarchy. +The outermost directory is the name of the dataset and contains metadata files as is +found in unpartitioned ADAM Parquet datasets. Within the outer dataset directory, +subdirectories are created with names based on the genomic contigs found +in the dataset, for example a subdirectory will be named ``contigName=22`` for chromosome 22. +Within each contigName directory, there are subdirectories named using a computed value +``positionBin``, for example a subdirectory named ``positionBin=5``. Records from the +dataset are written into Parquet files within the appropriate positionBin directory, computed +based on the start position of the record using the calculation ``floor(start / partitionSize)``. +For example, when using the default ``partitionSize`` of 1,000,000 base pairs, an +alignment record with start position 20,100,000 on chromosome 22 would be found in a +Parquet file at the path ``mydataset.adam/contigName=22/positionBin=20``. The splitting +of data into one or more Parquet fields in these leaf directories is automatic based on +Parquet block size settings. + +.. code:: + + mySavedAdamDataset.adam + | + |-- _partitionedByStartPos + L-- contigName=1 + L-- positionBin=0 + |-- part-r-00001.parquet + +-- part-r-00002.parquet + L-- positionBin=1 + |-- part-r-00003.parquet + |-- part-r-00004.parquet + L-- positionBin= ( N bins ...) + L-- contigName= ( N contigs ... ) + |-- (N bins ... ) + + +The existence of the file ``_partitionedByStartPos`` can be tested with the public +function ``ADAMContext.isPartitioned(path)`` and can be used to determine explicitly +if an ADAM Parquet dataset is partitioned using this scheme. The partition size which was used +when the dataset was written to disk is stored in ``_partitionedByStartPos`` and is +read in as a property of the dataset by the ``loadPartitionedParquet`` functions. + +The Spark dataset API recognizes that the field ``positionBin`` is defined implicitly +by the Parquet files' partitioning scheme, and makes ``positionBin`` available as a field +that can be queried through the Spark SQL API. ``positionBin`` is used internally by +the public function ``GenomicRDD.filterByOverlappingRegions``. User code in ADAM-shell or user applications +could similarly utilize the ``positionBin`` field when creating Spark +SQL queries on a ``genomicRDD.dataset`` backed by partitioned Parquet. + +**Re-using a previously loaded partitioned dataset:** + +When a partitioned dataset is first created within an ADAM session, a partition +discovery/initialization step is performed that can take several minutes for large datasets. +The original GenomicRDD object can then be re-used multiple times as the parent +of different filtration and processing transformations and actions, without incurring +this initializiation cost again. Thus, re-use of a parent partitioned ``GenomicRDD`` +is key to realizing the latency advantages of partitioned datasets described above. + +.. code:: scala + + val mydata = loadPartitionedParquetAlignments("alignmets.adam") + val filteredCount1 = mydata.filterByOverlappingRegions(regions1).dataset.count + val filteredCount2 = mydata.filterByOverlappingRegions(regions2).dataset.count diff --git a/docs/cli/actions.rst b/docs/cli/actions.rst index b427182adc..982d98c336 100644 --- a/docs/cli/actions.rst +++ b/docs/cli/actions.rst @@ -38,9 +38,11 @@ tool takes two required arguments: 2. ``OUTPUT``: The path to save the transformed reads to. Supports any of ADAM's read output formats. -Beyond the `default options <#default-args>`__ and the `legacy output -options <#legacy-output>`__, ``transformAlignments`` supports a vast -range of options. These options fall into several general categories: +Beyond the `default options <#default-args>`__, the `legacy output +options <#legacy-output>`__, and the +`partitioned output options <#partitioned-output>`__, +``transformAlignments`` supports a vast range of options. These options +fall into several general categories: - General options: @@ -216,8 +218,9 @@ arguments: 2. ``OUTPUT``: The path to save the transformed genotypes to. Supports any of ADAM's genotype output formats. -Beyond the `default options <#default-args>`__ and the `legacy output -options <#legacy-output>`__, ``transformGenotypes`` +Beyond the `default options <#default-args>`__, the `legacy output +options <#legacy-output>`__, and the +`partitioned output options <#partitioned-output>`__, ``transformGenotypes`` has additional arguments: - ``-coalesce``: Sets the number of partitions to coalesce the output diff --git a/docs/cli/overview.rst b/docs/cli/overview.rst index 66bb88f293..50eb95a174 100644 --- a/docs/cli/overview.rst +++ b/docs/cli/overview.rst @@ -92,3 +92,12 @@ level. This is a three level scale: to the log. - ``SILENT``: If validation fails, ignore the data silently. +Partitioned output +------------------ + +Various commands in ADAM support saving partitioned Parquet output. These +commands take the following parameters, which control the output: + +- ``-partition_by_start_pos``: If true, enables saving a partitioned dataset. +- ``-partition_bin_size``: The size of each partition bin in base pairs. + Defaults to 1Mbp (1,000,000 base pairs).