Skip to content

Commit

Permalink
SPARK-17059: Allow FileFormat to specify partition pruning strategy (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
pwoody authored and robert3005 committed Nov 18, 2016
1 parent aa4e13d commit 7edbd64
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -438,14 +438,16 @@ case class FileSourceScanExec(
val partitionFiles = selectedPartitions.flatMap { partition =>
partition.files.map((_, partition.values))
}
val format = fsRelation.fileFormat
val splitter =
format.buildSplitter(session, fsRelation.location,
dataFilters, schema, session.sessionState.newHadoopConf())

val bucketed = partitionFiles.flatMap { case (file, values) =>
val blockLocations = getBlockLocations(file)
val filePath = file.getPath.toUri.toString
val format = fsRelation.fileFormat

if (format.isSplitable(session, fsRelation.options, file.getPath)) {
val validSplits = format.getSplits(session, fsRelation.location, file,
dataFilters, schema, session.sessionState.newHadoopConf())
val validSplits = splitter(file)
validSplits.map { split =>
val hosts = getBlockHosts(blockLocations, split.getStart, split.getLength)
PartitionedFile(values, filePath, split.getStart, split.getLength, hosts)
Expand Down Expand Up @@ -492,15 +494,18 @@ case class FileSourceScanExec(
val partitionFiles = selectedPartitions.flatMap { partition =>
partition.files.map((_, partition.values))
}
val format = fsRelation.fileFormat
val splitter =
format.buildSplitter(session, fsRelation.location,
dataFilters, schema, session.sessionState.newHadoopConf())

val splitFiles = partitionFiles.flatMap { case (file, values) =>
val blockLocations = getBlockLocations(file)
val filePath = file.getPath.toUri.toString
val format = fsRelation.fileFormat

// If the format is splittable, attempt to split and filter the file.
if (format.isSplitable(session, fsRelation.options, file.getPath)) {
val validSplits = format.getSplits(session, fsRelation.location, file,
dataFilters, schema, session.sessionState.newHadoopConf())
val validSplits = splitter(file)
validSplits.flatMap { split =>
val splitOffset = split.getStart
val end = splitOffset + split.getLength
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,15 @@ trait FileFormat {
}

/**
* For a file, return valid splits that may pass the given data filter.
* Allow a splittable FileFormat to produce a function to split individual files.
*/
def getSplits(
def buildSplitter(
sparkSession: SparkSession,
fileIndex: FileIndex,
fileStatus: FileStatus,
filters: Seq[Filter],
schema: StructType,
hadoopConf: Configuration): Seq[FileSplit] = {
Seq(new FileSplit(fileStatus.getPath, 0, fileStatus.getLen, Array.empty))
hadoopConf: Configuration): (FileStatus => Seq[FileSplit]) = {
stat => Seq(new FileSplit(stat.getPath, 0, stat.getLen, Array.empty))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@ package org.apache.spark.sql.execution.datasources.parquet

import java.io.FileNotFoundException
import java.net.URI
import java.util.concurrent.{Callable, TimeUnit}
import java.util.logging.{Logger => JLogger}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.{Failure, Try}

import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.parquet.{Log => ApacheParquetLog}
import org.apache.parquet.filter2.compat.{FilterCompat, RowGroupFilter}
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop._
Expand Down Expand Up @@ -278,75 +280,47 @@ class ParquetFileFormat
true
}

override def getSplits(
sparkSession: SparkSession,
fileIndex: FileIndex,
fileStatus: FileStatus,
filters: Seq[Filter],
schema: StructType,
hadoopConf: Configuration): Seq[FileSplit] = {
if (filters.isEmpty || !sparkSession.sessionState.conf.parquetPartitionPruningEnabled) {
override def buildSplitter(
sparkSession: SparkSession,
fileIndex: FileIndex,
filters: Seq[Filter],
schema: StructType,
hadoopConf: Configuration): (FileStatus => Seq[FileSplit]) = {
val pruningEnabled = sparkSession.sessionState.conf.parquetPartitionPruningEnabled
val defaultSplitter = super.buildSplitter(sparkSession, fileIndex, filters, schema, hadoopConf)
if (!pruningEnabled || filters.isEmpty) {
// Return immediately to save FileSystem overhead
super.getSplits(sparkSession, fileIndex, fileStatus, filters, schema, hadoopConf)
defaultSplitter
} else {
val filePath = fileStatus.getPath
val rootOption: Option[Path] = fileIndex.rootPaths
.find(root => filePath.toString.startsWith(root.toString))
val metadataOption = rootOption.flatMap { root =>
cachedMetadata.get(root).orElse(getMetadataForPath(filePath, root, hadoopConf))
.map { metadata =>
cachedMetadata.put(root, metadata)
metadata
}
val splitters = fileIndex.rootPaths.map { root =>
val splits = ParquetFileFormat.fileSplits.get(root,
new Callable[ParquetFileSplitter] {
override def call(): ParquetFileSplitter =
createParquetFileSplits(root, hadoopConf, schema)
})
root -> splits.buildSplitter(filters)
}.toMap
val compositeSplitter: (FileStatus => Seq[FileSplit]) = { stat =>
val filePath = stat.getPath
val rootOption: Option[Path] = fileIndex.rootPaths
.find(root => filePath.toString.startsWith(root.toString))
val splitterForPath = rootOption.flatMap(splitters.get).getOrElse(defaultSplitter)
splitterForPath(stat)
}
// If the metadata exists, filter the splits.
// Otherwise, fall back to the default implementation.
metadataOption
.map(filterToSplits(fileStatus, _, rootOption.get, filters, schema, hadoopConf))
.getOrElse(super.getSplits(sparkSession, fileIndex, fileStatus,
filters, schema, hadoopConf))
compositeSplitter
}
}

private def filterToSplits(
fileStatus: FileStatus,
metadata: ParquetMetadata,
metadataRoot: Path,
filters: Seq[Filter],
schema: StructType,
hadoopConf: Configuration): Seq[FileSplit] = {
val metadataBlocks = metadata.getBlocks

// Ensure that the metadata has an entry for the file.
// If it does not, do not filter at this stage.
val metadataContainsPath = metadataBlocks.asScala.exists { bmd =>
new Path(metadataRoot, bmd.getPath) == fileStatus.getPath
}
if (!metadataContainsPath) {
log.warn(s"Found _metadata file for $metadataRoot," +
s" but no entries for blocks in ${fileStatus.getPath}. Retaining whole file.")
return Seq(new FileSplit(fileStatus.getPath, 0, fileStatus.getLen, Array.empty))
}

val parquetSchema = metadata.getFileMetaData.getSchema
val filter = FilterCompat.get(filters
.flatMap(ParquetFilters.createFilter(schema, _))
.reduce(FilterApi.and))
val filteredMetadata =
RowGroupFilter.filterRowGroups(filter, metadataBlocks, parquetSchema).asScala
filteredMetadata.flatMap { bmd =>
val bmdPath = new Path(metadataRoot, bmd.getPath)
val fsPath = fileStatus.getPath
if (bmdPath == fsPath) {
Some(new FileSplit(bmdPath, bmd.getStartingPos, bmd.getTotalByteSize, Array.empty))
} else {
None
}
}
private def createParquetFileSplits(
root: Path,
hadoopConf: Configuration,
schema: StructType): ParquetFileSplitter = {
getMetadataForPath(root, hadoopConf)
.map(meta => new ParquetMetadataFileSplitter(root, meta.getBlocks.asScala, schema))
.getOrElse(ParquetDefaultFileSplitter)
}

private def getMetadataForPath(
filePath: Path,
rootPath: Path,
conf: Configuration): Option[ParquetMetadata] = {
val fs = rootPath.getFileSystem(conf)
Expand Down Expand Up @@ -523,6 +497,21 @@ class ParquetFileFormat
}

object ParquetFileFormat extends Logging {

@transient private val fileSplits: Cache[Path, ParquetFileSplitter] =
CacheBuilder.newBuilder()
.expireAfterAccess(4, TimeUnit.HOURS)
.concurrencyLevel(1)
.softValues()
.removalListener(new RemovalListener[Path, ParquetFileSplitter] {
override def onRemoval(removalNotification:
RemovalNotification[Path, ParquetFileSplitter]): Unit = {
val path = removalNotification.getKey
log.info(s"Removing value for path $path from cache, " +
s"cause: ${removalNotification.getCause}")
}
}).build()

private[parquet] def readSchema(
footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.parquet

import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}

import com.google.common.cache.{Cache, CacheBuilder}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate}
import org.apache.parquet.filter2.statisticslevel.StatisticsFilter
import org.apache.parquet.hadoop.metadata.BlockMetaData
import org.roaringbitmap.RoaringBitmap

import org.apache.spark.internal.Logging
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.ThreadUtils


abstract class ParquetFileSplitter {
def buildSplitter(filters: Seq[Filter]): (FileStatus => Seq[FileSplit])

def singleFileSplit(stat: FileStatus): Seq[FileSplit] = {
Seq(new FileSplit(stat.getPath, 0, stat.getLen, Array.empty))
}
}

object ParquetDefaultFileSplitter extends ParquetFileSplitter {
override def buildSplitter(filters: Seq[Filter]): (FileStatus => Seq[FileSplit]) = {
stat => singleFileSplit(stat)
}
}

class ParquetMetadataFileSplitter(
val root: Path,
val blocks: Seq[BlockMetaData],
val schema: StructType)
extends ParquetFileSplitter
with Logging {

private val referencedFiles = blocks.map(bmd => new Path(root, bmd.getPath)).toSet

private val filterSets: Cache[Filter, RoaringBitmap] =
CacheBuilder.newBuilder()
.expireAfterAccess(4, TimeUnit.HOURS)
.concurrencyLevel(1)
.build()

override def buildSplitter(filters: Seq[Filter]): (FileStatus => Seq[FileSplit]) = {
val (applied, unapplied, filteredBlocks) = this.synchronized {
val (applied, unapplied) = filters.partition(filterSets.getIfPresent(_) != null)
val filteredBlocks = filterSets.getAllPresent(applied.asJava).values().asScala
.reduceOption(RoaringBitmap.and)
.map { bitmap =>
blocks.zipWithIndex.filter { case(block, index) =>
bitmap.contains(index)
}.map(_._1)
}.getOrElse(blocks)
(applied, unapplied, filteredBlocks)
}

val eligible = parquetFilter(unapplied, filteredBlocks).map { bmd =>
val blockPath = new Path(root, bmd.getPath)
new FileSplit(blockPath, bmd.getStartingPos, bmd.getTotalByteSize, Array.empty)
}

val statFilter: (FileStatus => Seq[FileSplit]) = { stat =>
if (referencedFiles.contains(stat.getPath)) {
eligible.filter(_.getPath == stat.getPath)
} else {
log.warn(s"Found _metadata file for $root," +
s" but no entries for blocks in ${stat.getPath}. Retaining whole file.")
singleFileSplit(stat)
}
}
statFilter
}

private def parquetFilter(
filters: Seq[Filter],
blocks: Seq[BlockMetaData]): Seq[BlockMetaData] = {
if (filters.nonEmpty) {
// Asynchronously build bitmaps
Future {
buildFilterBitMaps(filters)
}(ParquetMetadataFileSplitter.executionContext)

val predicate = filters.flatMap {
ParquetFilters.createFilter(schema, _)
}.reduce(FilterApi.and)
blocks.filter(bmd => !StatisticsFilter.canDrop(predicate, bmd.getColumns))
} else {
blocks
}
}

private def buildFilterBitMaps(filters: Seq[Filter]): Unit = {
this.synchronized {
// Only build bitmaps for filters that don't exist.
val sets = filters
.filter(filterSets.getIfPresent(_) == null)
.flatMap { filter =>
val bitmap = new RoaringBitmap
ParquetFilters.createFilter(schema, filter)
.map((filter, _, bitmap))
}
var i = 0
val blockLen = blocks.size
while (i < blockLen) {
val bmd = blocks(i)
sets.foreach { case (filter, parquetFilter, bitmap) =>
if (!StatisticsFilter.canDrop(parquetFilter, bmd.getColumns)) {
bitmap.add(i)
}
}
i += 1
}
val mapping = sets.map { case (filter, _, bitmap) =>
bitmap.runOptimize()
filter -> bitmap
}.toMap.asJava
filterSets.putAll(mapping)
}
}
}
object ParquetMetadataFileSplitter {
private val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("parquet-metadata-filter", 1))
}

0 comments on commit 7edbd64

Please sign in to comment.