Skip to content

Commit

Permalink
[SPARK-20848][SQL] Shutdown the pool after reading parquet files
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

From JIRA: On each call to spark.read.parquet, a new ForkJoinPool is created. One of the threads in the pool is kept in the WAITING state, and never stopped, which leads to unbounded growth in number of threads.

We should shutdown the pool after reading parquet files.

## How was this patch tested?

Added a test to ParquetFileFormatSuite.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes apache#18073 from viirya/SPARK-20848.

(cherry picked from commit f72ad30)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
viirya authored and cloud-fan committed May 24, 2017
1 parent 13adc0f commit 2f68631
Showing 1 changed file with 4 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,8 @@ object ParquetFileFormat extends Logging {
partFiles: Seq[FileStatus],
ignoreCorruptFiles: Boolean): Seq[Footer] = {
val parFiles = partFiles.par
parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
val pool = new ForkJoinPool(8)
parFiles.tasksupport = new ForkJoinTaskSupport(pool)
parFiles.flatMap { currentFile =>
try {
// Skips row group information since we only need the schema.
Expand All @@ -512,6 +513,8 @@ object ParquetFileFormat extends Logging {
} else {
throw new IOException(s"Could not read footer for file: $currentFile", e)
}
} finally {
pool.shutdown()
}
}.seq
}
Expand Down

0 comments on commit 2f68631

Please sign in to comment.