From 4de31aafbbd9fd28d6051fefae3f05ec7f2716c4 Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Fri, 26 Feb 2016 15:39:20 +0000 Subject: [PATCH] [ADAM-962] Fix corrupt single-file BAM output. It seems like we were doing something incorrectly when writing the header. Additionally, we now write a correct end-of-file. Resolves #962. --- .../adam/rdd/read/AlignmentRecordRDD.scala | 42 ++++++++++++------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala index 6dc4be0101..2fad4f85fd 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala @@ -17,9 +17,16 @@ */ package org.bdgenomics.adam.rdd.read +import htsjdk.samtools._ +import htsjdk.samtools.util.{ + BinaryCodec, + BlockCompressedOutputStream, + BlockCompressedStreamConstants +} import java.io.{ InputStream, OutputStream, StringWriter, Writer } import htsjdk.samtools._ import htsjdk.samtools.util.{ BinaryCodec, BlockCompressedOutputStream } +import org.apache.avro.Schema import org.apache.hadoop.fs.{ FileSystem, Path } import org.apache.hadoop.io.LongWritable import org.apache.spark.broadcast.Broadcast @@ -45,12 +52,10 @@ import org.bdgenomics.adam.rdd.read.recalibration.BaseQualityRecalibration import org.bdgenomics.adam.rich.RichAlignmentRecord import org.bdgenomics.adam.util.MapTools import org.bdgenomics.adam.rdd.fragment.FragmentRDD -import org.bdgenomics.formats.avro.{ - AlignmentRecord, - Contig, - RecordGroupMetadata -} -import org.seqdoop.hadoop_bam.SAMRecordWritable +import org.bdgenomics.formats.avro._ +import org.bdgenomics.utils.misc.Logging +import org.seqdoop.hadoop_bam.{ SAMFormat, SAMRecordWritable } +import org.seqdoop.hadoop_bam.util.SAMOutputPreparer import scala.annotation.tailrec import scala.language.implicitConversions import scala.math.{ abs, min } @@ -385,7 +390,6 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord, compressedOut.close() } - // more flushing and closing os.flush() os.close() @@ -460,14 +464,19 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord, // open our output file val os = fs.create(outputPath) - // prepend our header to the list of files to copy - val filesToCopy = Seq(headPath) ++ tailFiles.toSeq + // prepare output + val format = if (asSam) { + SAMFormat.SAM + } else { + SAMFormat.BAM + } + new SAMOutputPreparer().prepareForRecords(os, format, header); // here is a byte array for copying val ba = new Array[Byte](1024) @tailrec def copy(is: InputStream, - os: OutputStream) { + los: OutputStream) { // make a read val bytesRead = is.read(ba) @@ -475,16 +484,16 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord, // did our read succeed? if so, write to output stream // and continue if (bytesRead >= 0) { - os.write(ba, 0, bytesRead) + los.write(ba, 0, bytesRead) - copy(is, os) + copy(is, los) } } // loop over allllll the files and copy them - val numFiles = filesToCopy.length + val numFiles = tailFiles.length var filesCopied = 1 - filesToCopy.foreach(p => { + tailFiles.toSeq.foreach(p => { // print a bit of progress logging log.info("Copying file %s, file %d of %d.".format( @@ -505,6 +514,11 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord, filesCopied += 1 }) + // finish the file off + if (!asSam) { + os.write(BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK); + } + // flush and close the output stream os.flush() os.close()