Skip to content

Commit

Permalink
[ADAM-1053] Clean up if-clauses in Transform CLI.
Browse files Browse the repository at this point in the history
Moves all if clauses into `maybeX` methods. Resolves bigdatagenomics#1053.
  • Loading branch information
fnothaft committed Jul 19, 2016
1 parent 06d0fd2 commit c251f79
Showing 1 changed file with 167 additions and 41 deletions.
208 changes: 167 additions & 41 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,38 +124,66 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans

val stringency = ValidationStringency.valueOf(args.stringency)

def apply(rdd: AlignmentRecordRDD): AlignmentRecordRDD = {

var adamRecords = rdd
val sc = rdd.rdd.context
val sl = StorageLevel.fromString(args.storageLevel)

val stringencyOpt = Option(args.stringency).map(ValidationStringency.valueOf(_))

/**
* @param rdd An RDD of reads.
* @return If the repartition argument is set, repartitions the input dataset
* to the number of partitions requested by the user. Forces a shuffle using
* a hash partitioner.
*/
private def maybeRepartition(rdd: AlignmentRecordRDD): AlignmentRecordRDD = {
if (args.repartition != -1) {
log.info("Repartitioning reads to to '%d' partitions".format(args.repartition))
adamRecords = adamRecords.transform(_.repartition(args.repartition))
rdd.transform(_.repartition(args.repartition))
} else {
rdd
}
}

/**
* @param rdd The RDD of reads that was the output of [[maybeRepartition]].
* @return If the user has provided the `-mark_duplicates` flag, returns a RDD
* where reads have been marked as duplicates if they appear to be from
* duplicated fragments. Else, returns the input RDD.
*/
private def maybeDedupe(rdd: AlignmentRecordRDD): AlignmentRecordRDD = {
if (args.markDuplicates) {
log.info("Marking duplicates")
adamRecords = adamRecords.markDuplicates()
rdd.markDuplicates()
} else {
rdd
}
}

/**
* @param rdd The RDD of reads that was output by [[maybeDedup]].
* @param sl The requested storage level for caching, if requested.
* Realignment is a two pass algorithm (generate targets, then realign), so
* we allow users to request caching with the -cache flag.
* @return If the user has set the -realign_indels flag, we realign the input
* RDD against a set of targets. If the user has provided a known set of
* INDELs with the -known_indels flag, we realign against those indels.
* Else, we generate candidate INDELs from the reads and then realign. If
* -realign_indels is not set, we return the input RDD.
*/
private def maybeRealign(rdd: AlignmentRecordRDD,
sl: StorageLevel): AlignmentRecordRDD = {
if (args.locallyRealign) {
val oldRdd = if (args.cache) {
adamRecords.transform(_.persist(sl))
} else {
adamRecords
}

log.info("Locally realigning indels.")

// has the user asked us to cache the rdd before multi-pass stages?
if (args.cache) {
rdd.rdd.persist(sl)
}

// fold and get the consensus model for this realignment run
val consensusGenerator = Option(args.knownIndelsFile)
.fold(new ConsensusGeneratorFromReads().asInstanceOf[ConsensusGenerator])(
new ConsensusGeneratorFromKnowns(_, sc).asInstanceOf[ConsensusGenerator]
new ConsensusGeneratorFromKnowns(_, rdd.rdd.context).asInstanceOf[ConsensusGenerator]
)

adamRecords = oldRdd.realignIndels(
// run realignment
val realignmentRdd = rdd.realignIndels(
consensusGenerator,
isSorted = false,
args.maxIndelSize,
Expand All @@ -164,75 +192,173 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
args.maxTargetSize
)

// unpersist our input, if persisting was requested
// we don't reference said rdd again, so unpersisting is ok
if (args.cache) {
oldRdd.rdd.unpersist()
rdd.rdd.unpersist()
}

realignmentRdd
} else {
rdd
}
}

/**
* @param rdd The RDD of reads that was output by [[maybeRealign]].
* @param sl The requested storage level for caching, if requested.
* BQSR is a two pass algorithm (generate table, then recalibrate), so
* we allow users to request caching with the -cache flag.
* @return If the user has set the -recalibrate_base_qualities flag, we
* estimate the empirical base qualities and recalibrate the reads. If
* the -known_snps flag is set, we use that file to mask off sites of
* known variation when recalibrating. If BQSR has not been requested,
* we return the input RDD.
*/
private def maybeRecalibrate(rdd: AlignmentRecordRDD,
sl: StorageLevel): AlignmentRecordRDD = {
if (args.recalibrateBaseQualities) {

log.info("Recalibrating base qualities")

val oldRdd = if (args.cache) {
adamRecords.transform(_.persist(sl))
} else {
adamRecords
// bqsr is a two pass algorithm, so cache the rdd if requested
if (args.cache) {
rdd.rdd.persist(sl)
}

val knownSnps: SnpTable = createKnownSnpsTable(sc)
adamRecords = oldRdd.recalibateBaseQualities(
sc.broadcast(knownSnps),
// create the known sites file, if one is available
val knownSnps: SnpTable = createKnownSnpsTable(rdd.rdd.context)

// run bqsr
val bqsredRdd = rdd.recalibateBaseQualities(
rdd.rdd.context.broadcast(knownSnps),
Option(args.observationsPath),
stringency
)

// if we cached the input, unpersist it, as it is never reused after bqsr
if (args.cache) {
oldRdd.rdd.unpersist()
rdd.rdd.unpersist()
}

bqsredRdd
} else {
rdd
}
}

/**
* @param rdd The RDD of reads that was output by [[maybeRecalibrate]].
* @return If -coalesce is set, coalesces the RDD to the number of paritions
* requested. Forces a shuffle if the number of partitions requested is
* smaller than the current number of partitions, or if the -force_shuffle
* flag is set. If -coalesce was not requested, returns the input RDD.
*/
private def maybeCoalesce(rdd: AlignmentRecordRDD): AlignmentRecordRDD = {
if (args.coalesce != -1) {
log.info("Coalescing the number of partitions to '%d'".format(args.coalesce))
if (args.coalesce > adamRecords.rdd.partitions.size || args.forceShuffle) {
adamRecords = adamRecords.transform(_.coalesce(args.coalesce, shuffle = true))
if (args.coalesce > rdd.rdd.partitions.size || args.forceShuffle) {
rdd.transform(_.coalesce(args.coalesce, shuffle = true))
} else {
adamRecords = adamRecords.transform(_.coalesce(args.coalesce, shuffle = false))
rdd.transform(_.coalesce(args.coalesce, shuffle = false))
}
} else {
rdd
}
}

// NOTE: For now, sorting needs to be the last transform
/**
* @param rdd The RDD of reads that was output by [[maybeCoalesce]].
* @param sl The requested storage level for caching, if requested.
* @return If -sortReads was set, returns the sorted reads. If
* -sort_lexicographically is additionally set, sorts lexicographically,
* instead of by contig index. If no sorting was requested, returns
* the input RDD.
*/
private def maybeSort(rdd: AlignmentRecordRDD,
sl: StorageLevel): AlignmentRecordRDD = {
if (args.sortReads) {
val oldRdd = if (args.cache) {
adamRecords.transform(_.persist(sl))
} else {
adamRecords

// cache the input if requested. sort is two stages:
// 1. sample to create partitioner
// 2. repartition and sort within partitions
if (args.cache) {
rdd.rdd.persist(sl)
}

log.info("Sorting reads")
if (args.sortLexicographically) {
adamRecords = oldRdd.sortReadsByReferencePosition()

// are we sorting lexicographically or using legacy SAM sort order?
val sortedRdd = if (args.sortLexicographically) {
rdd.sortReadsByReferencePosition()
} else {
adamRecords = oldRdd.sortReadsByReferencePositionAndIndex()
rdd.sortReadsByReferencePositionAndIndex()
}

// unpersist the cached rdd, if caching was requested
if (args.cache) {
oldRdd.rdd.unpersist()
rdd.rdd.unpersist()
}

sortedRdd
} else {
rdd
}
}

/**
* @param rdd The RDD of reads that was output by [[maybeSort]].
* @param stringencyOpt An optional validation stringency.
* @return If -add_md_tags is set, recomputes the MD tags for the reads
* provided. If some reads have tags, -md_tag_overwrite controls whether
* these tags are recomputed or not. If MD tagging isn't requested, we
* return the input RDD.
*/
def maybeMdTag(rdd: AlignmentRecordRDD,
stringencyOpt: Option[ValidationStringency]): AlignmentRecordRDD = {
if (args.mdTagsReferenceFile != null) {
log.info(s"Adding MDTags to reads based on reference file ${args.mdTagsReferenceFile}")
adamRecords = adamRecords.transform(rdd => {
rdd.transform(r => {
MDTagging(
rdd,
r,
args.mdTagsReferenceFile,
fragmentLength = args.mdTagsFragmentSize,
overwriteExistingTags = args.mdTagsOverwrite,
validationStringency = stringencyOpt.getOrElse(ValidationStringency.STRICT)
)
})
} else {
rdd
}
}

def apply(rdd: AlignmentRecordRDD): AlignmentRecordRDD = {

val sc = rdd.rdd.context
val sl = StorageLevel.fromString(args.storageLevel)

val stringencyOpt = Option(args.stringency).map(ValidationStringency.valueOf(_))

// first repartition if needed
val initialRdd = maybeRepartition(rdd)

// then, mark duplicates, if desired
val maybeDedupedRdd = maybeDedupe(initialRdd)

// once we've deduped our reads, maybe realign them
val maybeRealignedRdd = maybeRealign(maybeDedupedRdd, sl)

// run BQSR
val maybeRecalibratedRdd = maybeRecalibrate(maybeRealignedRdd, sl)

// does the user want us to coalesce before saving?
val finalPreprocessedRdd = maybeCoalesce(maybeRecalibratedRdd)

// NOTE: For now, sorting needs to be the last transform
val maybeSortedRdd = maybeSort(finalPreprocessedRdd, sl)

adamRecords
// recompute md tags, if requested, and return
maybeMdTag(maybeSortedRdd, stringencyOpt)
}

def forceNonParquet(): Boolean = {
Expand Down

0 comments on commit c251f79

Please sign in to comment.