From d8fbc5c6d02297e3d1db4e68b8bed78302e844ac Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Sun, 11 Sep 2016 17:17:54 -0700 Subject: [PATCH] [ADAM-1165] Support merging shards across multiple file systems. Resolves #1165. --- .../org/bdgenomics/adam/cli/MergeShards.scala | 5 ++- .../org/bdgenomics/adam/rdd/FileMerger.scala | 41 ++++++++++++++++--- 2 files changed, 38 insertions(+), 8 deletions(-) diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/MergeShards.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/MergeShards.scala index 66e4437388..f4a4ee2b76 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/MergeShards.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/MergeShards.scala @@ -67,10 +67,11 @@ class MergeShards(val args: MergeShardsArgs) extends BDGSparkCommand[MergeShards val optHeadPath = Option(args.headerPath).map(p => new Path(p)) val tailPath = new Path(args.inputPath) val outputPath = new Path(args.outputPath) - val fs = tailPath.getFileSystem(conf) + val fsIn = tailPath.getFileSystem(conf) + val fsOut = outputPath.getFileSystem(conf) // merge the files - FileMerger.mergeFiles(fs, + FileMerger.mergeFilesAcrossFilesystems(fsIn, fsOut, outputPath, tailPath, optHeadPath, writeEmptyGzipBlock = args.gzipAtEof, bufferSize = args.bufferSize) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/FileMerger.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/FileMerger.scala index 40bbc5cb12..a3410bd023 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/FileMerger.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/FileMerger.scala @@ -38,6 +38,8 @@ private[adam] object FileMerger extends Logging { * been written. * @param writeEmptyGzipBlock If true, we write an empty GZIP block at the * end of the merged file. + * + * @see mergeFilesAcrossFilesystems */ def mergeFiles(fs: FileSystem, outputPath: Path, @@ -45,9 +47,36 @@ private[adam] object FileMerger extends Logging { optHeaderPath: Option[Path] = None, writeEmptyGzipBlock: Boolean = false, bufferSize: Int = 4 * 1024 * 1024) { + mergeFilesAcrossFilesystems(fs, fs, + outputPath, tailPath, optHeaderPath = optHeaderPath, + writeEmptyGzipBlock = writeEmptyGzipBlock, + bufferSize = bufferSize) + } + + /** + * Merges together sharded files, while preserving partition ordering. + * + * Can read files from a different filesystem then they are written to. + * + * @param fsIn The file system implementation to use for the tail/head paths. + * @param fsOut The file system implementation to use for the output path. + * @param outputPath The location to write the merged file at. + * @param tailPath The location where the sharded files have been written. + * @param optHeaderPath Optionally, the location where a header file has + * been written. + * @param writeEmptyGzipBlock If true, we write an empty GZIP block at the + * end of the merged file. + */ + def mergeFilesAcrossFilesystems(fsIn: FileSystem, + fsOut: FileSystem, + outputPath: Path, + tailPath: Path, + optHeaderPath: Option[Path] = None, + writeEmptyGzipBlock: Boolean = false, + bufferSize: Int = 4 * 1024 * 1024) { // get a list of all of the files in the tail file - val tailFiles = fs.globStatus(new Path("%s/part-*".format(tailPath))) + val tailFiles = fsIn.globStatus(new Path("%s/part-*".format(tailPath))) .toSeq .map(_.getPath) .sortBy(_.getName) @@ -66,7 +95,7 @@ private[adam] object FileMerger extends Logging { // but! it is correct. // open our output file - val os = fs.create(outputPath) + val os = fsOut.create(outputPath) // here is a byte array for copying val ba = new Array[Byte](bufferSize) @@ -91,7 +120,7 @@ private[adam] object FileMerger extends Logging { log.info("Copying header file (%s)".format(p)) // open our input file - val is = fs.open(p) + val is = fsIn.open(p) // until we are out of bytes, copy copy(is, os) @@ -112,7 +141,7 @@ private[adam] object FileMerger extends Logging { numFiles)) // open our input file - val is = fs.open(p) + val is = fsIn.open(p) // until we are out of bytes, copy copy(is, os) @@ -134,7 +163,7 @@ private[adam] object FileMerger extends Logging { os.close() // delete temp files - optHeaderPath.foreach(headPath => fs.delete(headPath, true)) - fs.delete(tailPath, true) + optHeaderPath.foreach(headPath => fsIn.delete(headPath, true)) + fsIn.delete(tailPath, true) } }