Skip to content

Commit

Permalink
[ADAM-962] Fix corrupt single-file BAM output.
Browse files Browse the repository at this point in the history
It seems like we were doing something incorrectly when writing the header.
Additionally, we now write a correct end-of-file. Resolves bigdatagenomics#962.
  • Loading branch information
fnothaft committed Jul 16, 2016
1 parent 51b755b commit 75c64b4
Showing 1 changed file with 27 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@
*/
package org.bdgenomics.adam.rdd.read

import htsjdk.samtools._
import htsjdk.samtools.util.{
BinaryCodec,
BlockCompressedOutputStream,
BlockCompressedStreamConstants
}
import java.io.{ InputStream, OutputStream, StringWriter, Writer }
import java.lang.reflect.InvocationTargetException
import htsjdk.samtools._
import htsjdk.samtools.util.{ BinaryCodec, BlockCompressedOutputStream }
import org.apache.avro.Schema
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.apache.hadoop.io.LongWritable
import org.apache.spark.broadcast.Broadcast
Expand All @@ -38,7 +45,9 @@ import org.bdgenomics.adam.rdd.{ ADAMRDDFunctions, ADAMSaveAnyArgs }
import org.bdgenomics.adam.rich.RichAlignmentRecord
import org.bdgenomics.adam.util.MapTools
import org.bdgenomics.formats.avro._
import org.seqdoop.hadoop_bam.SAMRecordWritable
import org.bdgenomics.utils.misc.Logging
import org.seqdoop.hadoop_bam.{ SAMFormat, SAMRecordWritable }
import org.seqdoop.hadoop_bam.util.SAMOutputPreparer
import scala.annotation.tailrec
import scala.language.implicitConversions
import scala.math.{ abs, min }
Expand Down Expand Up @@ -351,7 +360,6 @@ private[rdd] class AlignmentRecordRDDFunctions(val rdd: RDD[AlignmentRecord])
compressedOut.close()
}

// more flushing and closing
os.flush()
os.close()

Expand Down Expand Up @@ -426,31 +434,36 @@ private[rdd] class AlignmentRecordRDDFunctions(val rdd: RDD[AlignmentRecord])
// open our output file
val os = fs.create(outputPath)

// prepend our header to the list of files to copy
val filesToCopy = Seq(headPath) ++ tailFiles.toSeq
// prepare output
val format = if (asSam) {
SAMFormat.SAM
} else {
SAMFormat.BAM
}
new SAMOutputPreparer().prepareForRecords(os, format, header);

// here is a byte array for copying
val ba = new Array[Byte](1024)

@tailrec def copy(is: InputStream,
os: OutputStream) {
los: OutputStream) {

// make a read
val bytesRead = is.read(ba)

// did our read succeed? if so, write to output stream
// and continue
if (bytesRead >= 0) {
os.write(ba, 0, bytesRead)
los.write(ba, 0, bytesRead)

copy(is, os)
copy(is, los)
}
}

// loop over allllll the files and copy them
val numFiles = filesToCopy.length
val numFiles = tailFiles.length
var filesCopied = 1
filesToCopy.foreach(p => {
tailFiles.toSeq.foreach(p => {

// print a bit of progress logging
log.info("Copying file %s, file %d of %d.".format(
Expand All @@ -471,6 +484,11 @@ private[rdd] class AlignmentRecordRDDFunctions(val rdd: RDD[AlignmentRecord])
filesCopied += 1
})

// finish the file off
if (!asSam) {
os.write(BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK);
}

// flush and close the output stream
os.flush()
os.close()
Expand Down

0 comments on commit 75c64b4

Please sign in to comment.