Skip to content

Commit

Permalink
[ADAM-962] Fix corrupt single-file BAM output.
Browse files Browse the repository at this point in the history
It seems like we were doing something incorrectly when writing the header.
Additionally, we now write a correct end-of-file. Resolves bigdatagenomics#962.
  • Loading branch information
fnothaft committed Jul 19, 2016
1 parent 496add6 commit 4de31aa
Showing 1 changed file with 28 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@
*/
package org.bdgenomics.adam.rdd.read

import htsjdk.samtools._
import htsjdk.samtools.util.{
BinaryCodec,
BlockCompressedOutputStream,
BlockCompressedStreamConstants
}
import java.io.{ InputStream, OutputStream, StringWriter, Writer }
import htsjdk.samtools._
import htsjdk.samtools.util.{ BinaryCodec, BlockCompressedOutputStream }
import org.apache.avro.Schema
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.apache.hadoop.io.LongWritable
import org.apache.spark.broadcast.Broadcast
Expand All @@ -45,12 +52,10 @@ import org.bdgenomics.adam.rdd.read.recalibration.BaseQualityRecalibration
import org.bdgenomics.adam.rich.RichAlignmentRecord
import org.bdgenomics.adam.util.MapTools
import org.bdgenomics.adam.rdd.fragment.FragmentRDD
import org.bdgenomics.formats.avro.{
AlignmentRecord,
Contig,
RecordGroupMetadata
}
import org.seqdoop.hadoop_bam.SAMRecordWritable
import org.bdgenomics.formats.avro._
import org.bdgenomics.utils.misc.Logging
import org.seqdoop.hadoop_bam.{ SAMFormat, SAMRecordWritable }
import org.seqdoop.hadoop_bam.util.SAMOutputPreparer
import scala.annotation.tailrec
import scala.language.implicitConversions
import scala.math.{ abs, min }
Expand Down Expand Up @@ -385,7 +390,6 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord,
compressedOut.close()
}

// more flushing and closing
os.flush()
os.close()

Expand Down Expand Up @@ -460,31 +464,36 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord,
// open our output file
val os = fs.create(outputPath)

// prepend our header to the list of files to copy
val filesToCopy = Seq(headPath) ++ tailFiles.toSeq
// prepare output
val format = if (asSam) {
SAMFormat.SAM
} else {
SAMFormat.BAM
}
new SAMOutputPreparer().prepareForRecords(os, format, header);

// here is a byte array for copying
val ba = new Array[Byte](1024)

@tailrec def copy(is: InputStream,
os: OutputStream) {
los: OutputStream) {

// make a read
val bytesRead = is.read(ba)

// did our read succeed? if so, write to output stream
// and continue
if (bytesRead >= 0) {
os.write(ba, 0, bytesRead)
los.write(ba, 0, bytesRead)

copy(is, os)
copy(is, los)
}
}

// loop over allllll the files and copy them
val numFiles = filesToCopy.length
val numFiles = tailFiles.length
var filesCopied = 1
filesToCopy.foreach(p => {
tailFiles.toSeq.foreach(p => {

// print a bit of progress logging
log.info("Copying file %s, file %d of %d.".format(
Expand All @@ -505,6 +514,11 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord,
filesCopied += 1
})

// finish the file off
if (!asSam) {
os.write(BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK);
}

// flush and close the output stream
os.flush()
os.close()
Expand Down

0 comments on commit 4de31aa

Please sign in to comment.