Skip to content

Commit

Permalink
Merge pull request #136 from tomwhite/tw_unmapped
Browse files Browse the repository at this point in the history
Add a way to get unmapped reads from a BAM.
  • Loading branch information
tomwhite authored Aug 24, 2017
2 parents 65941cb + 5d4b630 commit a162405
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 25 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ below:
|`AnySAMInputFormat`|`hadoopbam.anysam.trust-exts`|`true`|Whether to detect the file format (BAM, SAM, or CRAM) by file extension. If `false`, use the file contents to detect the format.|
|`KeyIgnoringAnySAMOutputFormat`|`hadoopbam.anysam.output-format`| |(Required.) The file format to use when writing BAM, SAM, or CRAM files. Should be one of `BAM`, `SAM`, or `CRAM`.|
| |`hadoopbam.anysam.write-header`|`true`|Whether to write the SAM header in each output file part. If `true`, call `setSAMHeader()` or `readSAMHeaderFrom()` to set the desired header.|
|`BAMInputFormat`|`hadoopbam.bam.intervals`| |Only include reads that match the specified intervals. BAM files must be indexed in this case. Intervals are comma-separated and follow the same syntax as the `-L` option in SAMtools. E.g. `chr1:1-20000,chr2:12000-20000`.|
|`BAMInputFormat`|`hadoopbam.bam.bounded-traversal`|`false`|If `true`, only include reads that match the intervals specified in `hadoopbam.bam.intervals`, and unplaced unmapped reads, if `hadoopbam.bam.traverse-unplaced-unmapped` is set to `true`.|
| |`hadoopbam.bam.intervals`| |Only include reads that match the specified intervals. BAM files must be indexed in this case. Intervals are comma-separated and follow the same syntax as the `-L` option in SAMtools. E.g. `chr1:1-20000,chr2:12000-20000`. When using this option `hadoopbam.bam.bounded-traversal` should be set to `true`.|
| |`hadoopbam.bam.traverse-unplaced-unmapped`|`false`|If `true`, include unplaced unmapped reads (that is, unmapped reads with no position) When using this option `hadoopbam.bam.bounded-traversal` should be set to `true`.|
|`KeyIgnoringBAMOutputFormat`|`hadoopbam.bam.write-splitting-bai`|`false`|If `true`, write _.splitting-bai_ files for every BAM file.|
|`CRAMInputFormat`|`hadoopbam.cram.reference-source-path`| |(Required.) The path to the reference. May be an `hdfs://` path.|
|`FastqInputFormat`|`hbam.fastq-input.base-quality-encoding`|`sanger`|The encoding used for base qualities. One of `sanger` or `illumina`.|
Expand Down
136 changes: 125 additions & 11 deletions src/main/java/org/seqdoop/hadoop_bam/BAMInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

package org.seqdoop.hadoop_bam;

import com.google.common.collect.ImmutableList;
import htsjdk.samtools.AbstractBAMFileIndex;
import htsjdk.samtools.BAMFileReader;
import htsjdk.samtools.BAMFileSpan;
import htsjdk.samtools.BAMIndex;
Expand Down Expand Up @@ -73,31 +75,96 @@ public class BAMInputFormat
// set this to true for debug output
public final static boolean DEBUG_BAM_SPLITTER = false;

/**
* If set to true, only include reads that overlap the given intervals (if specified),
* and unplaced unmapped reads (if specified). For programmatic use
* {@link #setTraversalParameters(Configuration, List, boolean)} should be preferred.
*/
public static final String BOUNDED_TRAVERSAL_PROPERTY = "hadoopbam.bam.bounded-traversal";

/**
* Filter by region, like <code>-L</code> in SAMtools. Takes a comma-separated
* list of intervals, e.g. <code>chr1:1-20000,chr2:12000-20000</code>. For
* programmatic use {@link #setIntervals(Configuration, List)} should be preferred.
*/
public static final String INTERVALS_PROPERTY = "hadoopbam.bam.intervals";

/**
* If set to true, include unplaced unmapped reads (that is, unmapped reads with no
* position). For programmatic use
* {@link #setTraversalParameters(Configuration, List, boolean)} should be preferred.
*/
public static final String TRAVERSE_UNPLACED_UNMAPPED_PROPERTY = "hadoopbam.bam.traverse-unplaced-unmapped";

/**
* Only include reads that overlap the given intervals. Unplaced unmapped reads are not
* included.
* @param conf the Hadoop configuration to set properties on
* @param intervals the intervals to filter by
* @param <T> the {@link Locatable} type
*/
public static <T extends Locatable> void setIntervals(Configuration conf,
List<T> intervals) {
StringBuilder sb = new StringBuilder();
for (Iterator<T> it = intervals.iterator(); it.hasNext(); ) {
Locatable l = it.next();
sb.append(String.format("%s:%d-%d", l.getContig(), l.getStart(), l.getEnd()));
if (it.hasNext()) {
sb.append(",");
setTraversalParameters(conf, intervals, false);
}

/**
* Only include reads that overlap the given intervals (if specified) and unplaced
* unmapped reads (if <code>true</code>).
* @param conf the Hadoop configuration to set properties on
* @param intervals the intervals to filter by, or <code>null</code> if all reads
* are to be included (in which case <code>traverseUnplacedUnmapped</code> must be
* <code>true</code>)
* @param traverseUnplacedUnmapped whether to included unplaced unampped reads
* @param <T> the {@link Locatable} type
*/
public static <T extends Locatable> void setTraversalParameters(Configuration conf,
List<T> intervals, boolean traverseUnplacedUnmapped) {
if (intervals == null && !traverseUnplacedUnmapped) {
throw new IllegalArgumentException("Traversing mapped reads only is not supported.");
}
conf.setBoolean(BOUNDED_TRAVERSAL_PROPERTY, true);
if (intervals != null) {
StringBuilder sb = new StringBuilder();
for (Iterator<T> it = intervals.iterator(); it.hasNext(); ) {
Locatable l = it.next();
sb.append(String.format("%s:%d-%d", l.getContig(), l.getStart(), l.getEnd()));
if (it.hasNext()) {
sb.append(",");
}
}
conf.set(INTERVALS_PROPERTY, sb.toString());
}
conf.set(INTERVALS_PROPERTY, sb.toString());
conf.setBoolean(TRAVERSE_UNPLACED_UNMAPPED_PROPERTY, traverseUnplacedUnmapped);
}

/**
* Reset traversal parameters so that all reads are included.
* @param conf the Hadoop configuration to set properties on
*/
public static void unsetTraversalParameters(Configuration conf) {
conf.unset(BOUNDED_TRAVERSAL_PROPERTY);
conf.unset(INTERVALS_PROPERTY);
conf.unset(TRAVERSE_UNPLACED_UNMAPPED_PROPERTY);
}

static boolean isBoundedTraversal(Configuration conf) {
return conf.getBoolean(BOUNDED_TRAVERSAL_PROPERTY, false) ||
conf.get(INTERVALS_PROPERTY) != null; // backwards compatibility
}

static boolean traverseUnplacedUnmapped(Configuration conf) {
return conf.getBoolean(TRAVERSE_UNPLACED_UNMAPPED_PROPERTY, false);
}

static List<Interval> getIntervals(Configuration conf) {
String intervalsProperty = conf.get(INTERVALS_PROPERTY);
if (intervalsProperty == null) {
return null;
}
if (intervalsProperty.isEmpty()) {
return ImmutableList.of();
}
List<Interval> intervals = new ArrayList<Interval>();
for (String s : intervalsProperty.split(",")) {
String[] parts = s.split(":|-");
Expand Down Expand Up @@ -297,8 +364,7 @@ private int addProbabilisticSplits(

private List<InputSplit> filterByInterval(List<InputSplit> splits, Configuration conf)
throws IOException {
List<Interval> intervals = getIntervals(conf);
if (intervals == null) {
if (!isBoundedTraversal(conf)) {
return splits;
}

Expand All @@ -314,6 +380,12 @@ private List<InputSplit> filterByInterval(List<InputSplit> splits, Configuration
.setOption(SamReaderFactory.Option.CACHE_FILE_BASED_INDEXES, true)
.setOption(SamReaderFactory.Option.EAGERLY_DECODE, false)
.setUseAsyncIo(false);

List<Interval> intervals = getIntervals(conf);

Map<Path, Long> fileToUnmapped = new LinkedHashMap<>();
boolean traverseUnplacedUnmapped = traverseUnplacedUnmapped(conf);

for (Path bamFile : bamFiles) {
FileSystem fs = bamFile.getFileSystem(conf);

Expand All @@ -328,9 +400,23 @@ private List<InputSplit> filterByInterval(List<InputSplit> splits, Configuration
SAMFileHeader header = SAMHeaderReader.readSAMHeaderFrom(in, conf);
SAMSequenceDictionary dict = header.getSequenceDictionary();
BAMIndex idx = samReader.indexing().getIndex();
QueryInterval[] queryIntervals = prepareQueryIntervals(intervals, dict);
fileToSpan.put(bamFile, BAMFileReader.getFileSpan(queryIntervals, idx));

if (intervals != null && !intervals.isEmpty()) {
QueryInterval[] queryIntervals = prepareQueryIntervals(intervals, dict);
fileToSpan.put(bamFile, BAMFileReader.getFileSpan(queryIntervals, idx));
}

if (traverseUnplacedUnmapped) {
long startOfLastLinearBin = idx.getStartOfLastLinearBin();
long noCoordinateCount = ((AbstractBAMFileIndex) idx).getNoCoordinateCount();
if (startOfLastLinearBin != -1 && noCoordinateCount > 0) {
// add FileVirtualSplit (with no intervals) from startOfLastLinearBin to
// end of file
fileToUnmapped.put(bamFile, startOfLastLinearBin);
}
}
}

}
}

Expand All @@ -342,13 +428,41 @@ private List<InputSplit> filterByInterval(List<InputSplit> splits, Configuration
long splitEnd = virtualSplit.getEndVirtualOffset();
BAMFileSpan splitSpan = new BAMFileSpan(new Chunk(splitStart, splitEnd));
BAMFileSpan span = fileToSpan.get(virtualSplit.getPath());
if (span == null) {
continue;
}
span = (BAMFileSpan) span.removeContentsBefore(splitSpan);
span = (BAMFileSpan) span.removeContentsAfter(splitSpan);
if (!span.getChunks().isEmpty()) {
filteredSplits.add(new FileVirtualSplit(virtualSplit.getPath(), splitStart, splitEnd,
virtualSplit.getLocations(), span.toCoordinateArray()));
}
}

if (traverseUnplacedUnmapped) {
// add extra splits that contain only unmapped reads
for (Map.Entry<Path, Long> e : fileToUnmapped.entrySet()) {
Path file = e.getKey();
long unmappedStart = e.getValue();
boolean foundFirstSplit = false;
for (InputSplit split : splits) { // TODO: are splits in order of start position?
FileVirtualSplit virtualSplit = (FileVirtualSplit) split;
if (virtualSplit.getPath().equals(file)) {
long splitStart = virtualSplit.getStartVirtualOffset();
long splitEnd = virtualSplit.getEndVirtualOffset();
if (foundFirstSplit) {
filteredSplits.add(new FileVirtualSplit(virtualSplit.getPath(), splitStart, splitEnd,
virtualSplit.getLocations()));
} else if (splitStart <= unmappedStart && unmappedStart <= splitEnd) {
filteredSplits.add(new FileVirtualSplit(virtualSplit.getPath(), unmappedStart, splitEnd,
virtualSplit.getLocations()));
foundFirstSplit = true;
}
}
}
}
}

return filteredSplits;
}

Expand Down
10 changes: 8 additions & 2 deletions src/main/java/org/seqdoop/hadoop_bam/BAMRecordReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,17 @@ public static long getKey0(int refIdx, int alignmentStart0) {
throw new IllegalArgumentException("Property hadoopbam.bam.keep-paired-reads-together is no longer honored.");
}

List<Interval> intervals = BAMInputFormat.getIntervals(conf);
if (intervals != null) {
boolean boundedTraversal = BAMInputFormat.isBoundedTraversal(conf);
if (boundedTraversal && split.getIntervalFilePointers() != null) {
// return reads for intervals
List<Interval> intervals = BAMInputFormat.getIntervals(conf);
QueryInterval[] queryIntervals = BAMInputFormat.prepareQueryIntervals(intervals, header.getSequenceDictionary());
iterator = bamFileReader.createIndexIterator(queryIntervals, false, split.getIntervalFilePointers());
} else if (boundedTraversal && split.getIntervalFilePointers() == null) {
// return unmapped reads
iterator = bamFileReader.queryUnmapped();
} else {
// return everything
BAMFileSpan splitSpan = new BAMFileSpan(new Chunk(virtualStart, virtualEnd));
iterator = bamFileReader.getIterator(splitSpan);
}
Expand Down
7 changes: 6 additions & 1 deletion src/test/java/org/seqdoop/hadoop_bam/BAMTestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,17 @@ public static File writeBamFile(int numPairs, SAMFileHeader.SortOrder sortOrder)
false, true, null,
null,
-1, false);

} else {
samRecordSetBuilder.addPair(String.format("test-read-%03d", i), chr, start1,
start2);
}
}
if (numPairs > 0) { // add two unplaced unmapped fragments if non-empty
samRecordSetBuilder.addUnmappedFragment(String.format
("test-read-%03d-unplaced-unmapped", numPairs++));
samRecordSetBuilder.addUnmappedFragment(String.format
("test-read-%03d-unplaced-unmapped", numPairs++));
}

final File bamFile = File.createTempFile("test", ".bam");
bamFile.deleteOnExit();
Expand Down
76 changes: 67 additions & 9 deletions src/test/java/org/seqdoop/hadoop_bam/TestBAMInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,26 @@ public class TestBAMInputFormat {
private String input;
private TaskAttemptContext taskAttemptContext;
private JobContext jobContext;

private void completeSetup(List<Interval> intervals) {

private void completeSetup() {
completeSetup(false, null, true);
}

private void completeSetupWithIntervals(List<Interval> intervals) {
completeSetupWithBoundedTraversal(intervals, false);
}

private void completeSetupWithBoundedTraversal(List<Interval> intervals, boolean
traverseUnplacedUnmapped) {
completeSetup(true, intervals, traverseUnplacedUnmapped);
}

private void completeSetup(boolean boundedTraversal, List<Interval> intervals, boolean
traverseUnplacedUnmapped) {
Configuration conf = new Configuration();
conf.set("mapred.input.dir", "file://" + input);
if (intervals != null) {
BAMInputFormat.setIntervals(conf, intervals);
if (boundedTraversal) {
BAMInputFormat.setTraversalParameters(conf, intervals, traverseUnplacedUnmapped);
}
taskAttemptContext = new TaskAttemptContextImpl(conf, mock(TaskAttemptID.class));
jobContext = new JobContextImpl(conf, taskAttemptContext.getJobID());
Expand All @@ -39,7 +53,7 @@ private void completeSetup(List<Interval> intervals) {
@Test
public void testNoReadsInFirstSplitBug() throws Exception {
input = BAMTestUtil.writeBamFileWithLargeHeader().getAbsolutePath();
completeSetup(null);
completeSetup();
BAMInputFormat inputFormat = new BAMInputFormat();
List<InputSplit> splits = inputFormat.getSplits(jobContext);
assertEquals(1, splits.size());
Expand All @@ -49,15 +63,15 @@ public void testNoReadsInFirstSplitBug() throws Exception {
public void testMultipleSplits() throws Exception {
input = BAMTestUtil.writeBamFile(1000, SAMFileHeader.SortOrder.queryname)
.getAbsolutePath();
completeSetup(null);
completeSetup();
jobContext.getConfiguration().setInt(FileInputFormat.SPLIT_MAXSIZE, 40000);
BAMInputFormat inputFormat = new BAMInputFormat();
List<InputSplit> splits = inputFormat.getSplits(jobContext);
assertEquals(2, splits.size());
List<SAMRecord> split0Records = getSAMRecordsFromSplit(inputFormat, splits.get(0));
List<SAMRecord> split1Records = getSAMRecordsFromSplit(inputFormat, splits.get(1));
assertEquals(1577, split0Records.size());
assertEquals(423, split1Records.size());
assertEquals(425, split1Records.size());
SAMRecord lastRecordOfSplit0 = split0Records.get(split0Records.size() - 1);
SAMRecord firstRecordOfSplit1 = split1Records.get(0);
assertEquals(lastRecordOfSplit0.getReadName(), firstRecordOfSplit1.getReadName());
Expand All @@ -73,7 +87,7 @@ public void testIntervals() throws Exception {
intervals.add(new Interval("chr21", 5000, 9999)); // includes two unpaired fragments
intervals.add(new Interval("chr21", 20000, 22999));

completeSetup(intervals);
completeSetupWithIntervals(intervals);

jobContext.getConfiguration().setInt(FileInputFormat.SPLIT_MAXSIZE, 40000);
BAMInputFormat inputFormat = new BAMInputFormat();
Expand All @@ -90,7 +104,7 @@ public void testIntervalCoveringWholeChromosome() throws Exception {
List<Interval> intervals = new ArrayList<Interval>();
intervals.add(new Interval("chr21", 1, 1000135));

completeSetup(intervals);
completeSetupWithIntervals(intervals);

jobContext.getConfiguration().setInt(FileInputFormat.SPLIT_MAXSIZE, 40000);
BAMInputFormat inputFormat = new BAMInputFormat();
Expand All @@ -102,6 +116,50 @@ public void testIntervalCoveringWholeChromosome() throws Exception {
assertEquals(423, split1Records.size());
}

@Test
public void testIntervalsAndUnmapped() throws Exception {
input = BAMTestUtil.writeBamFile(1000, SAMFileHeader.SortOrder.coordinate)
.getAbsolutePath();
List<Interval> intervals = new ArrayList<Interval>();
intervals.add(new Interval("chr21", 5000, 9999)); // includes two unpaired fragments
intervals.add(new Interval("chr21", 20000, 22999));

completeSetupWithBoundedTraversal(intervals, true);

jobContext.getConfiguration().setInt(FileInputFormat.SPLIT_MAXSIZE, 40000);
BAMInputFormat inputFormat = new BAMInputFormat();
List<InputSplit> splits = inputFormat.getSplits(jobContext);
assertEquals(2, splits.size());
List<SAMRecord> split0Records = getSAMRecordsFromSplit(inputFormat, splits.get(0));
List<SAMRecord> split1Records = getSAMRecordsFromSplit(inputFormat, splits.get(1));
assertEquals(16, split0Records.size());
assertEquals(2, split1Records.size());
}

@Test
public void testUnmapped() throws Exception {
input = BAMTestUtil.writeBamFile(1000, SAMFileHeader.SortOrder.coordinate)
.getAbsolutePath();

completeSetupWithBoundedTraversal(null, true);

jobContext.getConfiguration().setInt(FileInputFormat.SPLIT_MAXSIZE, 40000);
BAMInputFormat inputFormat = new BAMInputFormat();
List<InputSplit> splits = inputFormat.getSplits(jobContext);
assertEquals(1, splits.size());
List<SAMRecord> split0Records = getSAMRecordsFromSplit(inputFormat, splits.get(0));
assertEquals(2, split0Records.size());
}

@Test(expected = IllegalArgumentException.class)
public void testMappedOnly() throws Exception {
input = BAMTestUtil.writeBamFile(1000, SAMFileHeader.SortOrder.coordinate)
.getAbsolutePath();

// Mapped only (-XL unmapped) is currently unsupported and throws an exception.
completeSetupWithBoundedTraversal(null, false);
}

private List<SAMRecord> getSAMRecordsFromSplit(BAMInputFormat inputFormat,
InputSplit split) throws Exception {
RecordReader<LongWritable, SAMRecordWritable> reader = inputFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public void testBAMWithSplittingBai() throws Exception {
new SAMRecordSetBuilder(true, SAMFileHeader.SortOrder.coordinate).getHeader());

final int actualCount = getBAMRecordCount(outFile);
assertEquals(numPairs * 2, actualCount);
assertEquals(numPairs * 2 + 2, actualCount); // 2 unmapped reads

File splittingBai = new File(outFile.getParentFile(), outFile.getName() +
SplittingBAMIndexer.OUTPUT_FILE_EXTENSION);
Expand Down

0 comments on commit a162405

Please sign in to comment.