Skip to content

Commit

Permalink
[ADAM-685] add .interval_list file parser
Browse files Browse the repository at this point in the history
  • Loading branch information
ryan-williams authored and fnothaft committed May 31, 2015
1 parent d5255ae commit 35fbbb6
Show file tree
Hide file tree
Showing 4 changed files with 600 additions and 5 deletions.
31 changes: 27 additions & 4 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,26 @@ class ADAMContext(val sc: SparkContext) extends Serializable with Logging {
if (Metrics.isRecording) records.instrument() else records
}

def loadIntervalList(filePath: String): RDD[Feature] = {
val parsedLines = sc.textFile(filePath).map(new IntervalListParser().parse)
val (seqDict, records) = (SequenceDictionary(parsedLines.flatMap(_._1).collect(): _*), parsedLines.flatMap(_._2))
val seqDictMap = seqDict.records.map(sr => sr.name -> sr).toMap
val recordsWithContigs = for {
record <- records
seqRecord <- seqDictMap.get(record.getContig.getContigName)
} yield Feature.newBuilder(record)
.setContig(
Contig.newBuilder()
.setContigName(seqRecord.name)
.setReferenceURL(seqRecord.url.getOrElse(null))
.setContigMD5(seqRecord.md5.getOrElse(null))
.setContigLength(seqRecord.length)
.build()
)
.build()
if (Metrics.isRecording) recordsWithContigs.instrument() else recordsWithContigs
}

def loadParquetFeatures(
filePath: String,
predicate: Option[FilterPredicate] = None,
Expand Down Expand Up @@ -418,18 +438,21 @@ class ADAMContext(val sc: SparkContext) extends Serializable with Logging {
projection: Option[Schema] = None): RDD[Feature] = {

if (filePath.endsWith(".bed")) {
log.info("Loading " + filePath + " as BED and converting to features. Projection is ignored.")
log.info(s"Loading $filePath as BED and converting to features. Projection is ignored.")
loadBED(filePath)
} else if (filePath.endsWith(".gtf") ||
filePath.endsWith(".gff")) {
log.info("Loading " + filePath + " as GTF/GFF and converting to features. Projection is ignored.")
log.info(s"Loading $filePath as GTF/GFF and converting to features. Projection is ignored.")
loadGTF(filePath)
} else if (filePath.endsWith(".narrowPeak") ||
filePath.endsWith(".narrowpeak")) {
log.info("Loading " + filePath + " as NarrowPeak and converting to features. Projection is ignored.")
log.info(s"Loading $filePath as NarrowPeak and converting to features. Projection is ignored.")
loadNarrowPeak(filePath)
} else if (filePath.endsWith(".interval_list")) {
log.info(s"Loading $filePath as IntervalList and converting to features. Projection is ignored.")
loadIntervalList(filePath)
} else {
log.info("Loading " + filePath + " as Parquet containing Features.")
log.info(s"Loading $filePath as Parquet containing Features.")
loadParquetFeatures(filePath, None, projection)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.bdgenomics.adam.rdd.features

import java.io.File
import java.util.UUID
import org.bdgenomics.formats.avro.{ Contig, Strand, Feature }
import org.bdgenomics.adam.models.SequenceRecord
import org.bdgenomics.formats.avro.{ Dbxref, Contig, Strand, Feature }
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -118,6 +119,72 @@ class GTFParser extends FeatureParser {
}
}

class IntervalListParser extends Serializable {
def parse(line: String): (Option[SequenceRecord], Option[Feature]) = {
val fields = line.split("[ \t]+")
if (fields.length < 2) {
(None, None)
} else {
if (fields(0).startsWith("@")) {
if (fields(0).startsWith("@SQ")) {
val (name, length, url, md5) = {
val attrs = fields.drop(1).map(field => field.split(":", 2) match {
case Array(key, value) => key -> value
case x => throw new Exception(s"Expected fields of the form 'key:value' in field $field but got: $x. Line:\n$line")
}).toMap

// Require that all @SQ lines have name, length, url, md5.
(attrs("SN"), attrs("LN").toLong, attrs("UR"), attrs("M5"))
}

(Some(SequenceRecord(name, length, md5, url)), None)
} else {
(None, None)
}
} else {
if (fields.length < 4) {
throw new Exception(s"Invalid line: $line")
}

val (dbxrfs, attrs: Map[String, String]) =
(if (fields.length < 5 || fields(4) == "." || fields(4) == "-") {
(Nil, Map())
} else {
val a = fields(4).split(Array(';', ',')).map(field => field.split('|') match {
case Array(key, value) =>
key match {
case "gn" | "ens" | "vega" | "ccds" => (Some(Dbxref.newBuilder().setDb(key).setAccession(value).build()), None)
case _ => (None, Some(key -> value))
}
case x => throw new Exception(s"Expected fields of the form 'key|value;' but got: $field. Line:\n$line")
})

(a.flatMap(_._1).toList, a.flatMap(_._2).toMap)
})

(
None,
Some(
Feature.newBuilder()
.setContig(Contig.newBuilder().setContigName(fields(0)).build())
.setStart(fields(1).toLong)
.setEnd(fields(2).toLong)
.setStrand(fields(3) match {
case "+" => Strand.Forward
case "-" => Strand.Reverse
case _ => Strand.Independent
})
.setAttributes(attrs)
.setDbxrefs(dbxrfs)
.build()
)
)
}
}
}

}

class BEDParser extends FeatureParser {

override def parse(line: String): Seq[Feature] = {
Expand Down
Loading

0 comments on commit 35fbbb6

Please sign in to comment.