-
Notifications
You must be signed in to change notification settings - Fork 311
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ADAM-651] Hive-style partitioning of parquet files by genomic position #1878
Conversation
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for cleaning up the rebase/merge stuff, @jpdna!
* @param pathName The path name to load alignment records from. | ||
* Globs/directories are supported. | ||
* @param regions Optional list of genomic regions to load. | ||
* @param addChrPrefix Flag to add "chr" prefix to contigs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this should be part of the API, and in fact simply adding or removing "chr" is not sufficient for converting between the different styles. See e.g. https://github.com/heuermh/dishevelled-bio/blob/master/tools/src/main/java/org/dishevelled/bio/tools/RenameReferences.java#L125 and below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed, I don't like it either in the API.
I'll try to push any needed conversion into the application code (Mango) by having it look at sequence dictionary and see if a conversion is needed, so that by the time a ReferenceRegion makes it into ADAM code it is on the correct contig name convention for the underlying source dataset.
@heuermh - I'll plan to use the replacement logic you pointed to - thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 towards not including it and pushing it to user level. FYI, you can't link against dishevelled-bio
as it is LGPL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can't link against dishevelled-bio as it is LGPL.
I'm the only copyright holder in dishevelled-bio, so I could relicense stuff in there if necessary. I don't think this bit is interesting enough to do so, and it only covered the one use case I was interested in. That's why I haven't submitted something identical as a solution for #1757.
* @param pathName The path name to load alignment records from. | ||
* Globs/directories are supported. | ||
* @param regions Optional list of genomic regions to load. | ||
* @param addChrPrefix Flag to add "chr" prefix to contigs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove as above
* @param pathName The path name to load alignment records from. | ||
* Globs/directories are supported. | ||
* @param regions Optional list of genomic regions to load. | ||
* @param addChrPrefix Flag to add "chr" prefix to contigs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove as above
* @param addChrPrefix Flag to add "chr" prefix to contigs | ||
* @return Returns a FeatureRDD. | ||
*/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove extra whitespace
} | ||
|
||
datasetBoundFeatureRDD | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove extra whitespace
.option("spark.sql.parquet.compression.codec", compressCodec.toString.toLowerCase()) | ||
.save(filePath) | ||
writePartitionedParquetFlag(filePath) | ||
//rdd.context.writePartitionedParquetFlag(filePath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove commented out code
@@ -925,6 +925,33 @@ class FeatureRDDSuite extends ADAMFunSuite { | |||
assert(rdd3.dataset.count === 4) | |||
} | |||
|
|||
sparkTest("load paritioned parquet to sql, save, re-read from avro") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
paritioned → partitioned
@@ -638,6 +638,41 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite { | |||
assert(rdd3.dataset.count === 20) | |||
} | |||
|
|||
sparkTest("load from sam, save as partitioend parquet, and and re-read from partitioned parquet") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partitioend → partitioned
@@ -128,6 +128,15 @@ class GenotypeRDDSuite extends ADAMFunSuite { | |||
assert(starts(752790L)) | |||
} | |||
|
|||
sparkTest("round trip to paritioned parquet") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
paritioned → partitioned
"Options other than compression codec are ignored.") | ||
val df = toDF() | ||
|
||
df.withColumn("posBin", floor(df("start") / partitionSize)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"posBin" → "position" or "positionBin" or "bin"
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jpdna! It looks like this is close to ready!
* @param pathName The path name to load alignment records from. | ||
* Globs/directories are supported. | ||
* @param regions Optional list of genomic regions to load. | ||
* @param addChrPrefix Flag to add "chr" prefix to contigs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 towards not including it and pushing it to user level. FYI, you can't link against dishevelled-bio
as it is LGPL.
val reads: AlignmentRecordRDD = ParquetUnboundAlignmentRecordRDD(sc, pathName, sd, rgd, pgs) | ||
|
||
val datasetBoundAlignmentRecordRDD: AlignmentRecordRDD = regions match { | ||
case Some(x) => DatasetBoundAlignmentRecordRDD(reads.dataset.filter(referenceRegionsToDatasetQueryString(x)), reads.sequences, reads.recordGroups, reads.processingSteps) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Break longlines.
* @param addChrPrefix Flag to add "chr" prefix to contigs | ||
* @return Returns a VariantRDD | ||
*/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra whitespace.
* @param addChrPrefix Flag to add "chr" prefix to contigs | ||
* @return Returns an AlignmentRecordRDD. | ||
*/ | ||
def loadPartitionedParquetAlignments(pathName: String, regions: Option[Iterable[ReferenceRegion]] = None): AlignmentRecordRDD = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anywhere you have Option[Iterable[ReferenceRegion]] = None
should be Iterable[ReferenceRegion] = Iterable.empty
.
* @param addChrPrefix Flag to add "chr" prefix to contigs | ||
* @return Returns a GenotypeRDD. | ||
*/ | ||
def loadPartitionedParquetGenotypes(pathName: String, regions: Option[Iterable[ReferenceRegion]] = None): GenotypeRDD = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above comment RE: Option[Iterable[ReferenceRegion]] = None
.
* @param addChrPrefix Flag to add "chr" prefix to contigs | ||
* @return Returns a NucleotideContigFragmentRDD | ||
*/ | ||
def loadPartitionedParquetFragments(pathName: String, regions: Option[Iterable[ReferenceRegion]] = None): NucleotideContigFragmentRDD = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above comment RE: Option[Iterable[ReferenceRegion]] = None
.
* @return Return True if partitioned flag found, False otherwise. | ||
*/ | ||
|
||
def checkPartitionedParquetFlag(filePath: String): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
*/ | ||
|
||
def checkPartitionedParquetFlag(filePath: String): Boolean = { | ||
val path = new Path(filePath, "_isPartitionedByStartPos") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'd suggest using the getFsAndFilesWithFilter
function above. Behavior should be undefined if you have a glob but not all the paths are partitioned.
|
||
def referenceRegionsToDatasetQueryString(regions: Iterable[ReferenceRegion], partitionSize: Int = 1000000): String = { | ||
|
||
var regionQueryString = "(contigName=" + "\'" + regions.head.referenceName + "\' and posBin >= \'" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will throw if regions.isEmpty
, suggest:
regions.map(r => {
// logic for a single reference region goes here
}).mkString(" or " )
|
||
def writePartitionedParquetFlag(filePath: String): Boolean = { | ||
val path = new Path(filePath, "_isPartitionedByStartPos") | ||
val fs = path.getFileSystem(toDF().sqlContext.sparkContext.hadoopConfiguration) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, should just be rdd.context.hadoopConfiguration
Test PASSed. |
Test PASSed. |
Test PASSed. |
02abc77
to
2a4c022
Compare
Test PASSed. |
I believe I have addressed the reviewer requests above, except for the following discussed below:
I'll go ahead and start making those changes, and testing Mango with them, but I'd like to get the rest of this PR through a second pass in parallel. OR - as we do need to get this merged sooner rather than later for Mango - what if we leave the |
Test PASSed. |
@@ -857,4 +888,5 @@ class NucleotideContigFragmentRDDSuite extends ADAMFunSuite { | |||
|
|||
checkSave(variantContexts) | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
assert(sequenceRdd.sequences.containsRefName("aSequence")) | ||
} | ||
|
||
val inputPath = testFile("small.1.bed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a lot of asserts here. Can you comment their purpose or break them into separate tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, removed intermediate step asserts which were redundant.
genotypes.saveAsPartitionedParquet(outputPath) | ||
val unfilteredGenotypes = sc.loadPartitionedParquetGenotypes(outputPath) | ||
assert(unfilteredGenotypes.rdd.count === 18) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
assert(unfilteredVariants.rdd.count === 6) | ||
assert(unfilteredVariants.dataset.count === 6) | ||
|
||
val regionsVariants = sc.loadPartitionedParquetVariants(outputPath, List(ReferenceRegion("2", 19000L, 21000L), ReferenceRegion("13", 752700L, 752750L))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line break
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@jpdna it may be good to update the Mango PR bigdatagenomics/mango#344 and make sure we have all the functionality we need in Mango included in this PR. |
Test PASSed. |
Jenkins, retest this please. |
Test FAILed. Build result: FAILURE[...truncated 7 lines...] > /home/jenkins/git2/bin/git init /home/jenkins/workspace/ADAM-prb # timeout=10Fetching upstream changes from https://github.com/bigdatagenomics/adam.git > /home/jenkins/git2/bin/git --version # timeout=10 > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/heads/:refs/remotes/origin/ # timeout=15 > /home/jenkins/git2/bin/git config remote.origin.url https://github.com/bigdatagenomics/adam.git # timeout=10 > /home/jenkins/git2/bin/git config --add remote.origin.fetch +refs/heads/:refs/remotes/origin/ # timeout=10 > /home/jenkins/git2/bin/git config remote.origin.url https://github.com/bigdatagenomics/adam.git # timeout=10Fetching upstream changes from https://github.com/bigdatagenomics/adam.git > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse origin/pr/1878/merge^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains 7894e89 # timeout=10Checking out Revision 7894e89 (origin/pr/1878/merge) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f 7894e89 > /home/jenkins/git2/bin/git rev-list 8f53bfe # timeout=10Triggering ADAM-prb ? 2.6.2,2.10,2.2.1,centosTriggering ADAM-prb ? 2.6.2,2.11,2.2.1,centosTriggering ADAM-prb ? 2.7.3,2.10,2.2.1,centosTriggering ADAM-prb ? 2.7.3,2.11,2.2.1,centosADAM-prb ? 2.6.2,2.10,2.2.1,centos completed with result FAILUREADAM-prb ? 2.6.2,2.11,2.2.1,centos completed with result FAILUREADAM-prb ? 2.7.3,2.10,2.2.1,centos completed with result FAILUREADAM-prb ? 2.7.3,2.11,2.2.1,centos completed with result FAILURENotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'Test FAILed. |
Test FAILed. Build result: FAILURE[...truncated 7 lines...] > /home/jenkins/git2/bin/git init /home/jenkins/workspace/ADAM-prb # timeout=10Fetching upstream changes from https://github.com/bigdatagenomics/adam.git > /home/jenkins/git2/bin/git --version # timeout=10 > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/heads/:refs/remotes/origin/ # timeout=15 > /home/jenkins/git2/bin/git config remote.origin.url https://github.com/bigdatagenomics/adam.git # timeout=10 > /home/jenkins/git2/bin/git config --add remote.origin.fetch +refs/heads/:refs/remotes/origin/ # timeout=10 > /home/jenkins/git2/bin/git config remote.origin.url https://github.com/bigdatagenomics/adam.git # timeout=10Fetching upstream changes from https://github.com/bigdatagenomics/adam.git > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse origin/pr/1878/merge^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains 345e43f # timeout=10Checking out Revision 345e43f (origin/pr/1878/merge) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f 345e43f > /home/jenkins/git2/bin/git rev-list 7894e89 # timeout=10Triggering ADAM-prb ? 2.6.2,2.10,2.2.1,centosTriggering ADAM-prb ? 2.6.2,2.11,2.2.1,centosTriggering ADAM-prb ? 2.7.3,2.10,2.2.1,centosTriggering ADAM-prb ? 2.7.3,2.11,2.2.1,centosADAM-prb ? 2.6.2,2.10,2.2.1,centos completed with result FAILUREADAM-prb ? 2.6.2,2.11,2.2.1,centos completed with result FAILUREADAM-prb ? 2.7.3,2.10,2.2.1,centos completed with result FAILUREADAM-prb ? 2.7.3,2.11,2.2.1,centos completed with result FAILURENotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'Test FAILed. |
removed addChrPrefix parameter Address PR comments - part 1 Address PR comments - part 2 fix nits Rebased Address review comments - part 3 address reviewer comments - white space and redundant asserts fixed isPartitioned
309a49f
to
23a3bcc
Compare
Test PASSed. |
Test PASSed. |
Ping for further review. |
* | ||
* @param filePath Path to save the file at. | ||
*/ | ||
def writePartitionedParquetFlag(filePath: String): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree, done.
Test PASSed. |
As discussed earlier, here are two alternatives for style changes for the load methods: Always return dataset bound RDD (always-return-dataset-bound.patch.txt) def loadPartitionedParquetAlignments(
pathName: String,
regions: Iterable[ReferenceRegion] = Iterable.empty): AlignmentRecordRDD = {
require(isPartitioned(pathName), s"Input Parquet files ($pathName) are not partitioned.")
val reads = loadParquetAlignments(pathName, optPredicate = None, optProjection = None)
val dataset = if (regions.nonEmpty) {
reads.dataset.filter(referenceRegionsToDatasetQueryString(regions))
} else {
reads.dataset
}
DatasetBoundAlignmentRecordRDD(dataset, reads.sequences, reads.recordGroups, reads.processingSteps)
} Return unbound or dataset bound RDD (return-unbound-or-dataset-bound.patch.txt) def loadPartitionedParquetAlignments(
pathName: String,
regions: Iterable[ReferenceRegion] = Iterable.empty): AlignmentRecordRDD = {
val reads = loadParquetAlignments(pathName, optPredicate = None, optProjection = None)
val filteredReads = if (regions.nonEmpty) {
require(isPartitioned(pathName), s"Input Parquet files ($pathName) are not partitioned.")
DatasetBoundAlignmentRecordRDD(
reads.dataset.filter(referenceRegionsToDatasetQueryString(regions)),
reads.sequences,
reads.recordGroups,
reads.processingSteps
)
} else {
reads
}
filteredReads
} |
Sorry, github isn't allowing me to upload the referred to patches, will send via email. |
Thanks @heuermh! |
…Parquet return DatasetBound
Test PASSed. |
We must not have good enough unit test coverage then, because both patches passed all unit tests. :) |
Test FAILed. Build result: FAILURE[...truncated 7 lines...] > /home/jenkins/git2/bin/git init /home/jenkins/workspace/ADAM-prb # timeout=10Fetching upstream changes from https://github.com/bigdatagenomics/adam.git > /home/jenkins/git2/bin/git --version # timeout=10 > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/heads/:refs/remotes/origin/ # timeout=15 > /home/jenkins/git2/bin/git config remote.origin.url https://github.com/bigdatagenomics/adam.git # timeout=10 > /home/jenkins/git2/bin/git config --add remote.origin.fetch +refs/heads/:refs/remotes/origin/ # timeout=10 > /home/jenkins/git2/bin/git config remote.origin.url https://github.com/bigdatagenomics/adam.git # timeout=10Fetching upstream changes from https://github.com/bigdatagenomics/adam.git > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse origin/pr/1878/merge^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains d22b9ff # timeout=10Checking out Revision d22b9ff (origin/pr/1878/merge) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f d22b9ff > /home/jenkins/git2/bin/git rev-list cba6e71 # timeout=10Triggering ADAM-prb ? 2.6.2,2.10,2.2.1,centosTriggering ADAM-prb ? 2.6.2,2.11,2.2.1,centosTriggering ADAM-prb ? 2.7.3,2.10,2.2.1,centosTriggering ADAM-prb ? 2.7.3,2.11,2.2.1,centosADAM-prb ? 2.6.2,2.10,2.2.1,centos completed with result FAILUREADAM-prb ? 2.6.2,2.11,2.2.1,centos completed with result FAILUREADAM-prb ? 2.7.3,2.10,2.2.1,centos completed with result FAILUREADAM-prb ? 2.7.3,2.11,2.2.1,centos completed with result FAILURENotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'Test FAILed. |
FYI - I have a working branch where the filtering has been moved into a |
Replaced by #1911 |
Fixes #651
Manually merged changes for the "hive-style" partitioning branch as a single commit on top of master.