Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes to load properly VCF samples dict from _samples.avro #9

Open
wants to merge 4 commits into
base: genotypes-rdd
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/ADAM2Vcf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,22 @@ class ADAM2Vcf(val args: ADAM2VcfArgs) extends BDGSparkCommand[ADAM2VcfArgs] wit
if (dictionary.isDefined)
log.info("Using contig translation")

val adamGTs: RDD[Genotype] = sc.loadParquetGenotypes(args.adamFile)
val adamGTs = sc.loadParquetGenotypes(args.adamFile)

val coalesce = if (args.coalesce > 0) {
Some(args.coalesce)
} else {
None
}

adamGTs.toVariantContext
.saveAsVcf(args.outputPath, dict = dictionary, sortOnSave = args.sort, coalesceTo = coalesce)
// convert to variant contexts and prep for save
val variantContexts = adamGTs.toVariantContextRDD
val variantContextsToSave = if (args.coalesce > 0) {
variantContexts.transform(_.coalesce(args.coalesce))
} else {
variantContexts
}

variantContextsToSave.saveAsVcf(args.outputPath, sortOnSave = args.sort)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object AlleleCountHelper extends Serializable {

def countAlleles(adamVariants: RDD[Genotype], args: AlleleCountArgs) {
val usefulData = adamVariants.map(p => (
p.getVariant.getContig.getContigName,
p.getVariant.getContigName,
p.getVariant.getStart,
p.getVariant.getReferenceAllele,
p.getVariant.getAlternateAllele,
Expand Down
20 changes: 11 additions & 9 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/Vcf2ADAM.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,24 @@ class Vcf2ADAM(val args: Vcf2ADAMArgs) extends BDGSparkCommand[Vcf2ADAMArgs] wit
if (dictionary.isDefined)
log.info("Using contig translation")

var adamVariants: RDD[VariantContext] = sc.loadVcf(args.vcfPath, sd = dictionary)
if (args.coalesce > 0) {
if (args.coalesce > adamVariants.partitions.size || args.forceShuffle) {
adamVariants = adamVariants.coalesce(args.coalesce, shuffle = true)
val variantContextRdd = sc.loadVcf(args.vcfPath, sdOpt = dictionary)
var variantContextsToSave = if (args.coalesce > 0) {
if (args.coalesce > variantContextRdd.partitions.size || args.forceShuffle) {
variantContextRdd.transform(_.coalesce(args.coalesce, shuffle = true))
} else {
adamVariants = adamVariants.coalesce(args.coalesce, shuffle = false)
variantContextRdd.transform(_.coalesce(args.coalesce, shuffle = false))
}
} else {
variantContextRdd
}

if (args.onlyVariants) {
adamVariants
.map(v => v.variant.variant)
variantContextsToSave
.toVariantRDD
.saveAsParquet(args)
} else {
adamVariants
.flatMap(p => p.genotypes)
variantContextsToSave
.toGenotypeRDD
.saveAsParquet(args)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ class FlattenSuite extends ADAMFunSuite {

assert(records.size === 15)
assert(records(0).getSampleId === "NA12878")
assert(records(0).getVariant.getStart == 14396L)
assert(records(0).getVariant.getEnd == 14400L)
assert(records(0).getStart == 14396L)
assert(records(0).getEnd == 14400L)

val flattenArgLine = "%s %s".format(outputPath, flatPath).split("\\s+")
val flattenArgs: FlattenArgs = Args4j.apply[FlattenArgs](flattenArgLine)
Expand All @@ -61,8 +61,8 @@ class FlattenSuite extends ADAMFunSuite {

assert(flatRecords.size === 15)
assert(flatRecords(0).get("sampleId") === "NA12878")
assert(flatRecords(0).get("variant__start") === 14396L)
assert(flatRecords(0).get("variant__end") === 14400L)
assert(flatRecords(0).get("start") === 14396L)
assert(flatRecords(0).get("end") === 14400L)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ConsensusGeneratorFromKnowns(file: String, @transient sc: SparkContext) ex
val rdd: RDD[Variant] = sc.loadVariants(file)

Some(rdd.filter(v => v.getReferenceAllele.length != v.getAlternateAllele.length)
.map(v => ReferenceRegion(v.getContig.getContigName, v.getStart, v.getStart + v.getReferenceAllele.length))
.map(v => ReferenceRegion(v.getContigName, v.getStart, v.getStart + v.getReferenceAllele.length))
.map(r => new IndelRealignmentTarget(Some(r), r)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,18 +218,14 @@ class VariantContextConverter(dict: Option[SequenceDictionary] = None) extends S
extractVariantDatabaseAnnotation(variant, vc)
}

private def createContig(vc: BroadVariantContext): Contig = {
val contigName = contigToRefSeq.getOrElse(vc.getChr, vc.getChr)

Contig.newBuilder()
.setContigName(contigName)
.build()
private def createContig(vc: BroadVariantContext): String = {
contigToRefSeq.getOrElse(vc.getChr, vc.getChr)
}

private def createADAMVariant(vc: BroadVariantContext, alt: Option[String]): Variant = {
// VCF CHROM, POS, REF and ALT
val builder = Variant.newBuilder
.setContig(createContig(vc))
.setContigName(createContig(vc))
.setStart(vc.getStart - 1 /* ADAM is 0-indexed */ )
.setEnd(vc.getEnd /* ADAM is 0-indexed, so the 1-indexed inclusive end becomes exclusive */ )
.setReferenceAllele(vc.getReference.getBaseString)
Expand All @@ -255,10 +251,23 @@ class VariantContextConverter(dict: Option[SequenceDictionary] = None) extends S
annotations: VariantCallingAnnotations,
setPL: (htsjdk.variant.variantcontext.Genotype, Genotype.Builder) => Unit): Seq[Genotype] = {

// dupe variant, get contig name/start/end and null out
val contigName = variant.getContigName
val start = variant.getStart
val end = variant.getEnd
val newVariant = Variant.newBuilder(variant)
.setContigName(null)
.setStart(null)
.setEnd(null)
.build()

val genotypes: Seq[Genotype] = vc.getGenotypes.map(
(g: htsjdk.variant.variantcontext.Genotype) => {
val genotype: Genotype.Builder = Genotype.newBuilder
.setVariant(variant)
.setVariant(newVariant)
.setContigName(contigName)
.setStart(start)
.setEnd(end)
.setVariantCallingAnnotations(annotations)
.setSampleId(g.getSampleName)
.setAlleles(g.getAlleles.map(VariantContextConverter.convertAllele(vc, _)))
Expand Down Expand Up @@ -332,8 +341,8 @@ class VariantContextConverter(dict: Option[SequenceDictionary] = None) extends S
val variant: Variant = vc.variant
val vcb = new VariantContextBuilder()
.chr(refSeqToContig.getOrElse(
variant.getContig.getContigName,
variant.getContig.getContigName
variant.getContigName,
variant.getContigName
))
.start(variant.getStart + 1 /* Recall ADAM is 0-indexed */ )
.stop(variant.getStart + variant.getReferenceAllele.length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object IndelTable {
def apply(variants: RDD[Variant]): IndelTable = {
val consensus: Map[String, Iterable[Consensus]] = variants.filter(v => v.getReferenceAllele.length != v.getAlternateAllele.length)
.map(v => {
val referenceName = v.getContig.getContigName
val referenceName = v.getContigName
val consensus = if (v.getReferenceAllele.length > v.getAlternateAllele.length) {
// deletion
val deletionLength = v.getReferenceAllele.length - v.getAlternateAllele.length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object ReferencePosition extends Serializable {
* @return The reference position of this variant.
*/
def apply(variant: Variant): ReferencePosition = {
new ReferencePosition(variant.getContig.getContigName, variant.getStart)
new ReferencePosition(variant.getContigName, variant.getStart)
}

/**
Expand All @@ -70,8 +70,20 @@ object ReferencePosition extends Serializable {
* @return The reference position of this genotype.
*/
def apply(genotype: Genotype): ReferencePosition = {
val variant = genotype.getVariant
new ReferencePosition(variant.getContig.getContigName, variant.getStart)
val contigNameSet = Seq(Option(genotype.getContigName), Option(genotype.getVariant.getContigName))
.flatten
.toSet
val startSet = Seq(Option(genotype.getStart), Option(genotype.getVariant.getStart))
.flatten
.toSet
require(contigNameSet.nonEmpty, "Genotype has no contig name: %s".format(genotype))
require(contigNameSet.size == 1, "Genotype has multiple contig names: %s, %s".format(
contigNameSet, genotype))
require(startSet.nonEmpty, "Genotype has no start: %s".format(genotype))
require(startSet.size == 1, "Genotype has multiple starts: %s, %s".format(
startSet, genotype))

new ReferencePosition(contigNameSet.head, startSet.head)
}

def apply(referenceName: String, pos: Long): ReferencePosition = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.avro.generic.IndexedRecord
import org.bdgenomics.formats.avro.{ AlignmentRecord, NucleotideContigFragment, Contig }
import org.bdgenomics.adam.rdd.ADAMContext._
import htsjdk.samtools.{ SamReader, SAMFileHeader, SAMSequenceRecord, SAMSequenceDictionary }
import htsjdk.variant.vcf.VCFHeader
import scala.collection._

/**
Expand All @@ -45,6 +46,16 @@ object SequenceDictionary {
new SAMSequenceDictionary(dictionary.records.map(SequenceRecord.toSAMSequenceRecord).toList)
}

/**
* Creates a sequence dictionary from a sequence of Avro Contigs.
*
* @param contigs Seq of Contig records.
* @return Returns a sequence dictionary.
*/
def fromAvro(contigs: Seq[Contig]): SequenceDictionary = {
new SequenceDictionary(contigs.map(SequenceRecord.fromADAMContig).toVector)
}

/**
* Extracts a SAM sequence dictionary from a SAM file header and returns an
* ADAM sequence dictionary.
Expand All @@ -60,6 +71,22 @@ object SequenceDictionary {
fromSAMSequenceDictionary(samDict)
}

/**
* Extracts a SAM sequence dictionary from a VCF header and returns an
* ADAM sequence dictionary.
*
* @see fromSAMHeader
*
* @param header VCF file header.
* @return Returns an ADAM style sequence dictionary.
*/
def fromVCFHeader(header: VCFHeader): SequenceDictionary = {
val samDict = header.getSequenceDictionary

// vcf files can have null sequence dictionaries
Option(samDict).fold(SequenceDictionary.empty)(ssd => fromSAMSequenceDictionary(ssd))
}

/**
* Converts a picard/samtools SAMSequenceDictionary into an ADAM sequence dictionary.
*
Expand Down Expand Up @@ -156,6 +183,13 @@ class SequenceDictionary(val records: Vector[SequenceRecord]) extends Serializab
override def toString: String = {
records.map(_.toString).fold("SequenceDictionary{")(_ + "\n" + _) + "}"
}

private[adam] def toAvro: Seq[Contig] = {
records.map(SequenceRecord.toADAMContig)
.toSeq
}

def isEmpty: Boolean = records.isEmpty
}

object SequenceOrderingByName extends Ordering[SequenceRecord] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ object SnpTable {
}

def apply(variants: RDD[RichVariant]): SnpTable = {
val positions = variants.map(variant => (variant.getContig.getContigName, variant.getStart)).collect()
val positions = variants.map(variant => (variant.getContigName, variant.getStart)).collect()
val table = new mutable.HashMap[String, mutable.HashSet[Long]]
positions.foreach(tup => table.getOrElseUpdate(tup._1, { new mutable.HashSet[Long] }) += tup._2)
new SnpTable(table.mapValues(_.toSet).toMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.bdgenomics.adam.models

import org.bdgenomics.formats.avro.{ Genotype, DatabaseVariantAnnotation, Variant }
import org.bdgenomics.adam.rich.RichVariant
import org.bdgenomics.adam.rich.RichVariant._

/**
* Note: VariantContext inherits its name from the Picard VariantContext, and is not related to the SparkContext object.
Expand Down
Loading