From 5d4b630956abf39e5f1daaebb507d4bffbe27b66 Mon Sep 17 00:00:00 2001 From: Tom White Date: Thu, 13 Apr 2017 16:29:11 +0100 Subject: [PATCH] Add a way to get unmapped reads from a BAM. --- README.md | 4 +- .../seqdoop/hadoop_bam/BAMInputFormat.java | 136 ++++++++++++++++-- .../seqdoop/hadoop_bam/BAMRecordReader.java | 10 +- .../org/seqdoop/hadoop_bam/BAMTestUtil.java | 7 +- .../hadoop_bam/TestBAMInputFormat.java | 76 ++++++++-- .../hadoop_bam/TestBAMOutputFormat.java | 2 +- 6 files changed, 210 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 796b2f0..b4f189d 100644 --- a/README.md +++ b/README.md @@ -146,7 +146,9 @@ below: |`AnySAMInputFormat`|`hadoopbam.anysam.trust-exts`|`true`|Whether to detect the file format (BAM, SAM, or CRAM) by file extension. If `false`, use the file contents to detect the format.| |`KeyIgnoringAnySAMOutputFormat`|`hadoopbam.anysam.output-format`| |(Required.) The file format to use when writing BAM, SAM, or CRAM files. Should be one of `BAM`, `SAM`, or `CRAM`.| | |`hadoopbam.anysam.write-header`|`true`|Whether to write the SAM header in each output file part. If `true`, call `setSAMHeader()` or `readSAMHeaderFrom()` to set the desired header.| -|`BAMInputFormat`|`hadoopbam.bam.intervals`| |Only include reads that match the specified intervals. BAM files must be indexed in this case. Intervals are comma-separated and follow the same syntax as the `-L` option in SAMtools. E.g. `chr1:1-20000,chr2:12000-20000`.| +|`BAMInputFormat`|`hadoopbam.bam.bounded-traversal`|`false`|If `true`, only include reads that match the intervals specified in `hadoopbam.bam.intervals`, and unplaced unmapped reads, if `hadoopbam.bam.traverse-unplaced-unmapped` is set to `true`.| +| |`hadoopbam.bam.intervals`| |Only include reads that match the specified intervals. BAM files must be indexed in this case. Intervals are comma-separated and follow the same syntax as the `-L` option in SAMtools. E.g. `chr1:1-20000,chr2:12000-20000`. When using this option `hadoopbam.bam.bounded-traversal` should be set to `true`.| +| |`hadoopbam.bam.traverse-unplaced-unmapped`|`false`|If `true`, include unplaced unmapped reads (that is, unmapped reads with no position) When using this option `hadoopbam.bam.bounded-traversal` should be set to `true`.| |`KeyIgnoringBAMOutputFormat`|`hadoopbam.bam.write-splitting-bai`|`false`|If `true`, write _.splitting-bai_ files for every BAM file.| |`CRAMInputFormat`|`hadoopbam.cram.reference-source-path`| |(Required.) The path to the reference. May be an `hdfs://` path.| |`FastqInputFormat`|`hbam.fastq-input.base-quality-encoding`|`sanger`|The encoding used for base qualities. One of `sanger` or `illumina`.| diff --git a/src/main/java/org/seqdoop/hadoop_bam/BAMInputFormat.java b/src/main/java/org/seqdoop/hadoop_bam/BAMInputFormat.java index 6f0b8d6..66c02ac 100644 --- a/src/main/java/org/seqdoop/hadoop_bam/BAMInputFormat.java +++ b/src/main/java/org/seqdoop/hadoop_bam/BAMInputFormat.java @@ -22,6 +22,8 @@ package org.seqdoop.hadoop_bam; +import com.google.common.collect.ImmutableList; +import htsjdk.samtools.AbstractBAMFileIndex; import htsjdk.samtools.BAMFileReader; import htsjdk.samtools.BAMFileSpan; import htsjdk.samtools.BAMIndex; @@ -73,6 +75,13 @@ public class BAMInputFormat // set this to true for debug output public final static boolean DEBUG_BAM_SPLITTER = false; + /** + * If set to true, only include reads that overlap the given intervals (if specified), + * and unplaced unmapped reads (if specified). For programmatic use + * {@link #setTraversalParameters(Configuration, List, boolean)} should be preferred. + */ + public static final String BOUNDED_TRAVERSAL_PROPERTY = "hadoopbam.bam.bounded-traversal"; + /** * Filter by region, like -L in SAMtools. Takes a comma-separated * list of intervals, e.g. chr1:1-20000,chr2:12000-20000. For @@ -80,17 +89,72 @@ public class BAMInputFormat */ public static final String INTERVALS_PROPERTY = "hadoopbam.bam.intervals"; + /** + * If set to true, include unplaced unmapped reads (that is, unmapped reads with no + * position). For programmatic use + * {@link #setTraversalParameters(Configuration, List, boolean)} should be preferred. + */ + public static final String TRAVERSE_UNPLACED_UNMAPPED_PROPERTY = "hadoopbam.bam.traverse-unplaced-unmapped"; + + /** + * Only include reads that overlap the given intervals. Unplaced unmapped reads are not + * included. + * @param conf the Hadoop configuration to set properties on + * @param intervals the intervals to filter by + * @param the {@link Locatable} type + */ public static void setIntervals(Configuration conf, List intervals) { - StringBuilder sb = new StringBuilder(); - for (Iterator it = intervals.iterator(); it.hasNext(); ) { - Locatable l = it.next(); - sb.append(String.format("%s:%d-%d", l.getContig(), l.getStart(), l.getEnd())); - if (it.hasNext()) { - sb.append(","); + setTraversalParameters(conf, intervals, false); + } + + /** + * Only include reads that overlap the given intervals (if specified) and unplaced + * unmapped reads (if true). + * @param conf the Hadoop configuration to set properties on + * @param intervals the intervals to filter by, or null if all reads + * are to be included (in which case traverseUnplacedUnmapped must be + * true) + * @param traverseUnplacedUnmapped whether to included unplaced unampped reads + * @param the {@link Locatable} type + */ + public static void setTraversalParameters(Configuration conf, + List intervals, boolean traverseUnplacedUnmapped) { + if (intervals == null && !traverseUnplacedUnmapped) { + throw new IllegalArgumentException("Traversing mapped reads only is not supported."); + } + conf.setBoolean(BOUNDED_TRAVERSAL_PROPERTY, true); + if (intervals != null) { + StringBuilder sb = new StringBuilder(); + for (Iterator it = intervals.iterator(); it.hasNext(); ) { + Locatable l = it.next(); + sb.append(String.format("%s:%d-%d", l.getContig(), l.getStart(), l.getEnd())); + if (it.hasNext()) { + sb.append(","); + } } + conf.set(INTERVALS_PROPERTY, sb.toString()); } - conf.set(INTERVALS_PROPERTY, sb.toString()); + conf.setBoolean(TRAVERSE_UNPLACED_UNMAPPED_PROPERTY, traverseUnplacedUnmapped); + } + + /** + * Reset traversal parameters so that all reads are included. + * @param conf the Hadoop configuration to set properties on + */ + public static void unsetTraversalParameters(Configuration conf) { + conf.unset(BOUNDED_TRAVERSAL_PROPERTY); + conf.unset(INTERVALS_PROPERTY); + conf.unset(TRAVERSE_UNPLACED_UNMAPPED_PROPERTY); + } + + static boolean isBoundedTraversal(Configuration conf) { + return conf.getBoolean(BOUNDED_TRAVERSAL_PROPERTY, false) || + conf.get(INTERVALS_PROPERTY) != null; // backwards compatibility + } + + static boolean traverseUnplacedUnmapped(Configuration conf) { + return conf.getBoolean(TRAVERSE_UNPLACED_UNMAPPED_PROPERTY, false); } static List getIntervals(Configuration conf) { @@ -98,6 +162,9 @@ static List getIntervals(Configuration conf) { if (intervalsProperty == null) { return null; } + if (intervalsProperty.isEmpty()) { + return ImmutableList.of(); + } List intervals = new ArrayList(); for (String s : intervalsProperty.split(",")) { String[] parts = s.split(":|-"); @@ -297,8 +364,7 @@ private int addProbabilisticSplits( private List filterByInterval(List splits, Configuration conf) throws IOException { - List intervals = getIntervals(conf); - if (intervals == null) { + if (!isBoundedTraversal(conf)) { return splits; } @@ -314,6 +380,12 @@ private List filterByInterval(List splits, Configuration .setOption(SamReaderFactory.Option.CACHE_FILE_BASED_INDEXES, true) .setOption(SamReaderFactory.Option.EAGERLY_DECODE, false) .setUseAsyncIo(false); + + List intervals = getIntervals(conf); + + Map fileToUnmapped = new LinkedHashMap<>(); + boolean traverseUnplacedUnmapped = traverseUnplacedUnmapped(conf); + for (Path bamFile : bamFiles) { FileSystem fs = bamFile.getFileSystem(conf); @@ -328,9 +400,23 @@ private List filterByInterval(List splits, Configuration SAMFileHeader header = SAMHeaderReader.readSAMHeaderFrom(in, conf); SAMSequenceDictionary dict = header.getSequenceDictionary(); BAMIndex idx = samReader.indexing().getIndex(); - QueryInterval[] queryIntervals = prepareQueryIntervals(intervals, dict); - fileToSpan.put(bamFile, BAMFileReader.getFileSpan(queryIntervals, idx)); + + if (intervals != null && !intervals.isEmpty()) { + QueryInterval[] queryIntervals = prepareQueryIntervals(intervals, dict); + fileToSpan.put(bamFile, BAMFileReader.getFileSpan(queryIntervals, idx)); + } + + if (traverseUnplacedUnmapped) { + long startOfLastLinearBin = idx.getStartOfLastLinearBin(); + long noCoordinateCount = ((AbstractBAMFileIndex) idx).getNoCoordinateCount(); + if (startOfLastLinearBin != -1 && noCoordinateCount > 0) { + // add FileVirtualSplit (with no intervals) from startOfLastLinearBin to + // end of file + fileToUnmapped.put(bamFile, startOfLastLinearBin); + } + } } + } } @@ -342,6 +428,9 @@ private List filterByInterval(List splits, Configuration long splitEnd = virtualSplit.getEndVirtualOffset(); BAMFileSpan splitSpan = new BAMFileSpan(new Chunk(splitStart, splitEnd)); BAMFileSpan span = fileToSpan.get(virtualSplit.getPath()); + if (span == null) { + continue; + } span = (BAMFileSpan) span.removeContentsBefore(splitSpan); span = (BAMFileSpan) span.removeContentsAfter(splitSpan); if (!span.getChunks().isEmpty()) { @@ -349,6 +438,31 @@ private List filterByInterval(List splits, Configuration virtualSplit.getLocations(), span.toCoordinateArray())); } } + + if (traverseUnplacedUnmapped) { + // add extra splits that contain only unmapped reads + for (Map.Entry e : fileToUnmapped.entrySet()) { + Path file = e.getKey(); + long unmappedStart = e.getValue(); + boolean foundFirstSplit = false; + for (InputSplit split : splits) { // TODO: are splits in order of start position? + FileVirtualSplit virtualSplit = (FileVirtualSplit) split; + if (virtualSplit.getPath().equals(file)) { + long splitStart = virtualSplit.getStartVirtualOffset(); + long splitEnd = virtualSplit.getEndVirtualOffset(); + if (foundFirstSplit) { + filteredSplits.add(new FileVirtualSplit(virtualSplit.getPath(), splitStart, splitEnd, + virtualSplit.getLocations())); + } else if (splitStart <= unmappedStart && unmappedStart <= splitEnd) { + filteredSplits.add(new FileVirtualSplit(virtualSplit.getPath(), unmappedStart, splitEnd, + virtualSplit.getLocations())); + foundFirstSplit = true; + } + } + } + } + } + return filteredSplits; } diff --git a/src/main/java/org/seqdoop/hadoop_bam/BAMRecordReader.java b/src/main/java/org/seqdoop/hadoop_bam/BAMRecordReader.java index 606bf43..7076737 100644 --- a/src/main/java/org/seqdoop/hadoop_bam/BAMRecordReader.java +++ b/src/main/java/org/seqdoop/hadoop_bam/BAMRecordReader.java @@ -163,11 +163,17 @@ public static long getKey0(int refIdx, int alignmentStart0) { throw new IllegalArgumentException("Property hadoopbam.bam.keep-paired-reads-together is no longer honored."); } - List intervals = BAMInputFormat.getIntervals(conf); - if (intervals != null) { + boolean boundedTraversal = BAMInputFormat.isBoundedTraversal(conf); + if (boundedTraversal && split.getIntervalFilePointers() != null) { + // return reads for intervals + List intervals = BAMInputFormat.getIntervals(conf); QueryInterval[] queryIntervals = BAMInputFormat.prepareQueryIntervals(intervals, header.getSequenceDictionary()); iterator = bamFileReader.createIndexIterator(queryIntervals, false, split.getIntervalFilePointers()); + } else if (boundedTraversal && split.getIntervalFilePointers() == null) { + // return unmapped reads + iterator = bamFileReader.queryUnmapped(); } else { + // return everything BAMFileSpan splitSpan = new BAMFileSpan(new Chunk(virtualStart, virtualEnd)); iterator = bamFileReader.getIterator(splitSpan); } diff --git a/src/test/java/org/seqdoop/hadoop_bam/BAMTestUtil.java b/src/test/java/org/seqdoop/hadoop_bam/BAMTestUtil.java index 96cff8d..dda53a1 100644 --- a/src/test/java/org/seqdoop/hadoop_bam/BAMTestUtil.java +++ b/src/test/java/org/seqdoop/hadoop_bam/BAMTestUtil.java @@ -30,12 +30,17 @@ public static File writeBamFile(int numPairs, SAMFileHeader.SortOrder sortOrder) false, true, null, null, -1, false); - } else { samRecordSetBuilder.addPair(String.format("test-read-%03d", i), chr, start1, start2); } } + if (numPairs > 0) { // add two unplaced unmapped fragments if non-empty + samRecordSetBuilder.addUnmappedFragment(String.format + ("test-read-%03d-unplaced-unmapped", numPairs++)); + samRecordSetBuilder.addUnmappedFragment(String.format + ("test-read-%03d-unplaced-unmapped", numPairs++)); + } final File bamFile = File.createTempFile("test", ".bam"); bamFile.deleteOnExit(); diff --git a/src/test/java/org/seqdoop/hadoop_bam/TestBAMInputFormat.java b/src/test/java/org/seqdoop/hadoop_bam/TestBAMInputFormat.java index 1a86a51..9982ae2 100644 --- a/src/test/java/org/seqdoop/hadoop_bam/TestBAMInputFormat.java +++ b/src/test/java/org/seqdoop/hadoop_bam/TestBAMInputFormat.java @@ -25,12 +25,26 @@ public class TestBAMInputFormat { private String input; private TaskAttemptContext taskAttemptContext; private JobContext jobContext; - - private void completeSetup(List intervals) { + + private void completeSetup() { + completeSetup(false, null, true); + } + + private void completeSetupWithIntervals(List intervals) { + completeSetupWithBoundedTraversal(intervals, false); + } + + private void completeSetupWithBoundedTraversal(List intervals, boolean + traverseUnplacedUnmapped) { + completeSetup(true, intervals, traverseUnplacedUnmapped); + } + + private void completeSetup(boolean boundedTraversal, List intervals, boolean + traverseUnplacedUnmapped) { Configuration conf = new Configuration(); conf.set("mapred.input.dir", "file://" + input); - if (intervals != null) { - BAMInputFormat.setIntervals(conf, intervals); + if (boundedTraversal) { + BAMInputFormat.setTraversalParameters(conf, intervals, traverseUnplacedUnmapped); } taskAttemptContext = new TaskAttemptContextImpl(conf, mock(TaskAttemptID.class)); jobContext = new JobContextImpl(conf, taskAttemptContext.getJobID()); @@ -39,7 +53,7 @@ private void completeSetup(List intervals) { @Test public void testNoReadsInFirstSplitBug() throws Exception { input = BAMTestUtil.writeBamFileWithLargeHeader().getAbsolutePath(); - completeSetup(null); + completeSetup(); BAMInputFormat inputFormat = new BAMInputFormat(); List splits = inputFormat.getSplits(jobContext); assertEquals(1, splits.size()); @@ -49,7 +63,7 @@ public void testNoReadsInFirstSplitBug() throws Exception { public void testMultipleSplits() throws Exception { input = BAMTestUtil.writeBamFile(1000, SAMFileHeader.SortOrder.queryname) .getAbsolutePath(); - completeSetup(null); + completeSetup(); jobContext.getConfiguration().setInt(FileInputFormat.SPLIT_MAXSIZE, 40000); BAMInputFormat inputFormat = new BAMInputFormat(); List splits = inputFormat.getSplits(jobContext); @@ -57,7 +71,7 @@ public void testMultipleSplits() throws Exception { List split0Records = getSAMRecordsFromSplit(inputFormat, splits.get(0)); List split1Records = getSAMRecordsFromSplit(inputFormat, splits.get(1)); assertEquals(1577, split0Records.size()); - assertEquals(423, split1Records.size()); + assertEquals(425, split1Records.size()); SAMRecord lastRecordOfSplit0 = split0Records.get(split0Records.size() - 1); SAMRecord firstRecordOfSplit1 = split1Records.get(0); assertEquals(lastRecordOfSplit0.getReadName(), firstRecordOfSplit1.getReadName()); @@ -73,7 +87,7 @@ public void testIntervals() throws Exception { intervals.add(new Interval("chr21", 5000, 9999)); // includes two unpaired fragments intervals.add(new Interval("chr21", 20000, 22999)); - completeSetup(intervals); + completeSetupWithIntervals(intervals); jobContext.getConfiguration().setInt(FileInputFormat.SPLIT_MAXSIZE, 40000); BAMInputFormat inputFormat = new BAMInputFormat(); @@ -90,7 +104,7 @@ public void testIntervalCoveringWholeChromosome() throws Exception { List intervals = new ArrayList(); intervals.add(new Interval("chr21", 1, 1000135)); - completeSetup(intervals); + completeSetupWithIntervals(intervals); jobContext.getConfiguration().setInt(FileInputFormat.SPLIT_MAXSIZE, 40000); BAMInputFormat inputFormat = new BAMInputFormat(); @@ -102,6 +116,50 @@ public void testIntervalCoveringWholeChromosome() throws Exception { assertEquals(423, split1Records.size()); } + @Test + public void testIntervalsAndUnmapped() throws Exception { + input = BAMTestUtil.writeBamFile(1000, SAMFileHeader.SortOrder.coordinate) + .getAbsolutePath(); + List intervals = new ArrayList(); + intervals.add(new Interval("chr21", 5000, 9999)); // includes two unpaired fragments + intervals.add(new Interval("chr21", 20000, 22999)); + + completeSetupWithBoundedTraversal(intervals, true); + + jobContext.getConfiguration().setInt(FileInputFormat.SPLIT_MAXSIZE, 40000); + BAMInputFormat inputFormat = new BAMInputFormat(); + List splits = inputFormat.getSplits(jobContext); + assertEquals(2, splits.size()); + List split0Records = getSAMRecordsFromSplit(inputFormat, splits.get(0)); + List split1Records = getSAMRecordsFromSplit(inputFormat, splits.get(1)); + assertEquals(16, split0Records.size()); + assertEquals(2, split1Records.size()); + } + + @Test + public void testUnmapped() throws Exception { + input = BAMTestUtil.writeBamFile(1000, SAMFileHeader.SortOrder.coordinate) + .getAbsolutePath(); + + completeSetupWithBoundedTraversal(null, true); + + jobContext.getConfiguration().setInt(FileInputFormat.SPLIT_MAXSIZE, 40000); + BAMInputFormat inputFormat = new BAMInputFormat(); + List splits = inputFormat.getSplits(jobContext); + assertEquals(1, splits.size()); + List split0Records = getSAMRecordsFromSplit(inputFormat, splits.get(0)); + assertEquals(2, split0Records.size()); + } + + @Test(expected = IllegalArgumentException.class) + public void testMappedOnly() throws Exception { + input = BAMTestUtil.writeBamFile(1000, SAMFileHeader.SortOrder.coordinate) + .getAbsolutePath(); + + // Mapped only (-XL unmapped) is currently unsupported and throws an exception. + completeSetupWithBoundedTraversal(null, false); + } + private List getSAMRecordsFromSplit(BAMInputFormat inputFormat, InputSplit split) throws Exception { RecordReader reader = inputFormat diff --git a/src/test/java/org/seqdoop/hadoop_bam/TestBAMOutputFormat.java b/src/test/java/org/seqdoop/hadoop_bam/TestBAMOutputFormat.java index c7b456c..357cec2 100644 --- a/src/test/java/org/seqdoop/hadoop_bam/TestBAMOutputFormat.java +++ b/src/test/java/org/seqdoop/hadoop_bam/TestBAMOutputFormat.java @@ -201,7 +201,7 @@ public void testBAMWithSplittingBai() throws Exception { new SAMRecordSetBuilder(true, SAMFileHeader.SortOrder.coordinate).getHeader()); final int actualCount = getBAMRecordCount(outFile); - assertEquals(numPairs * 2, actualCount); + assertEquals(numPairs * 2 + 2, actualCount); // 2 unmapped reads File splittingBai = new File(outFile.getParentFile(), outFile.getName() + SplittingBAMIndexer.OUTPUT_FILE_EXTENSION);