Skip to content

Commit

Permalink
[ADAM-1165] Support merging shards across multiple file systems.
Browse files Browse the repository at this point in the history
Resolves #1165.
  • Loading branch information
fnothaft committed Sep 15, 2016
1 parent a39dd39 commit 4ae60f6
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ class MergeShards(val args: MergeShardsArgs) extends BDGSparkCommand[MergeShards
val optHeadPath = Option(args.headerPath).map(p => new Path(p))
val tailPath = new Path(args.inputPath)
val outputPath = new Path(args.outputPath)
val fs = tailPath.getFileSystem(conf)
val fsIn = tailPath.getFileSystem(conf)
val fsOut = outputPath.getFileSystem(conf)

// merge the files
FileMerger.mergeFiles(fs,
FileMerger.mergeFilesAcrossFilesystems(fsIn, fsOut,
outputPath, tailPath, optHeadPath,
writeEmptyGzipBlock = args.gzipAtEof,
bufferSize = args.bufferSize)
Expand Down
45 changes: 39 additions & 6 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/FileMerger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ private[adam] object FileMerger extends Logging {
* end of the merged file.
* @param writeCramEOF If true, we write CRAM's EOF signifier.
* @param bufferSize The size in bytes of the buffer used for copying.
*
* @see mergeFilesAcrossFilesystems
*/
def mergeFiles(fs: FileSystem,
outputPath: Path,
Expand All @@ -50,14 +52,45 @@ private[adam] object FileMerger extends Logging {
writeEmptyGzipBlock: Boolean = false,
writeCramEOF: Boolean = false,
bufferSize: Int = 4 * 1024 * 1024) {
mergeFilesAcrossFilesystems(fs, fs,
outputPath, tailPath, optHeaderPath = optHeaderPath,
writeEmptyGzipBlock = writeEmptyGzipBlock,
writeCramEOF = writeCramEOF,
bufferSize = bufferSize)
}

/**
* Merges together sharded files, while preserving partition ordering.
*
* Can read files from a different filesystem then they are written to.
*
* @param fsIn The file system implementation to use for the tail/head paths.
* @param fsOut The file system implementation to use for the output path.
* @param outputPath The location to write the merged file at.
* @param tailPath The location where the sharded files have been written.
* @param optHeaderPath Optionally, the location where a header file has
* been written.
* @param writeEmptyGzipBlock If true, we write an empty GZIP block at the
* end of the merged file.
* @param writeCramEOF If true, we write CRAM's EOF signifier.
* @param bufferSize The size in bytes of the buffer used for copying.
*/
def mergeFilesAcrossFilesystems(fsIn: FileSystem,
fsOut: FileSystem,
outputPath: Path,
tailPath: Path,
optHeaderPath: Option[Path] = None,
writeEmptyGzipBlock: Boolean = false,
writeCramEOF: Boolean = false,
bufferSize: Int = 4 * 1024 * 1024) {

require(bufferSize > 0,
"Cannot have buffer size < 1. %d was provided.".format(bufferSize))
require(!(writeEmptyGzipBlock && writeCramEOF),
"writeEmptyGzipBlock and writeCramEOF are mutually exclusive.")

// get a list of all of the files in the tail file
val tailFiles = fs.globStatus(new Path("%s/part-*".format(tailPath)))
val tailFiles = fsIn.globStatus(new Path("%s/part-*".format(tailPath)))
.toSeq
.map(_.getPath)
.sortBy(_.getName)
Expand All @@ -76,7 +109,7 @@ private[adam] object FileMerger extends Logging {
// but! it is correct.

// open our output file
val os = fs.create(outputPath)
val os = fsOut.create(outputPath)

// here is a byte array for copying
val ba = new Array[Byte](bufferSize)
Expand All @@ -101,7 +134,7 @@ private[adam] object FileMerger extends Logging {
log.info("Copying header file (%s)".format(p))

// open our input file
val is = fs.open(p)
val is = fsIn.open(p)

// until we are out of bytes, copy
copy(is, os)
Expand All @@ -122,7 +155,7 @@ private[adam] object FileMerger extends Logging {
numFiles))

// open our input file
val is = fs.open(p)
val is = fsIn.open(p)

// until we are out of bytes, copy
copy(is, os)
Expand All @@ -146,7 +179,7 @@ private[adam] object FileMerger extends Logging {
os.close()

// delete temp files
optHeaderPath.foreach(headPath => fs.delete(headPath, true))
fs.delete(tailPath, true)
optHeaderPath.foreach(headPath => fsIn.delete(headPath, true))
fsIn.delete(tailPath, true)
}
}

0 comments on commit 4ae60f6

Please sign in to comment.