Skip to content

Commit

Permalink
Renames CatalystWriteSupport to ParquetWriteSupport
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Jul 6, 2016
1 parent 21eadd1 commit 3084b35
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ private[sql] class ParquetFileFormat
// bundled with `ParquetOutputFormat[Row]`.
job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])

ParquetOutputFormat.setWriteSupportClass(job, classOf[CatalystWriteSupport])
ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])

// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
dataSchema).asInstanceOf[StructType]
CatalystWriteSupport.setSchema(dataSchemaToWrite, conf)
ParquetWriteSupport.setSchema(dataSchemaToWrite, conf)

// Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema)
// and `CatalystWriteSupport` (writing actual rows to Parquet files).
Expand Down Expand Up @@ -295,14 +295,14 @@ private[sql] class ParquetFileFormat
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
ParquetSchemaConverter.checkFieldNames(requiredSchema).json)
hadoopConf.set(
CatalystWriteSupport.SPARK_ROW_SCHEMA,
ParquetWriteSupport.SPARK_ROW_SCHEMA,
ParquetSchemaConverter.checkFieldNames(requiredSchema).json)

// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
requiredSchema).asInstanceOf[StructType]
CatalystWriteSupport.setSchema(dataSchemaToWrite, hadoopConf)
ParquetWriteSupport.setSchema(dataSchemaToWrite, hadoopConf)

// Sets flags for `CatalystSchemaConverter`
hadoopConf.setBoolean(
Expand Down Expand Up @@ -435,14 +435,14 @@ private[sql] class ParquetOutputWriterFactory(
// bundled with `ParquetOutputFormat[Row]`.
job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])

ParquetOutputFormat.setWriteSupportClass(job, classOf[CatalystWriteSupport])
ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])

// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(
StructType.metadataKeyForOptionalField,
dataSchema).asInstanceOf[StructType]
CatalystWriteSupport.setSchema(dataSchemaToWrite, conf)
ParquetWriteSupport.setSchema(dataSchemaToWrite, conf)

// Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema)
// and `CatalystWriteSupport` (writing actual rows to Parquet files).
Expand Down Expand Up @@ -611,7 +611,7 @@ private[sql] object ParquetFileFormat extends Logging {
})

conf.set(
CatalystWriteSupport.SPARK_ROW_SCHEMA,
ParquetWriteSupport.SPARK_ROW_SCHEMA,
ParquetSchemaConverter.checkFieldNames(dataSchema).json)

// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import org.apache.spark.sql.types._
* of this option is propagated to this class by the `init()` method and its Hadoop configuration
* argument.
*/
private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] with Logging {
private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
// A `ValueWriter` is responsible for writing a field of an `InternalRow` to the record consumer.
// Here we are using `SpecializedGetters` rather than `InternalRow` so that we can directly access
// data in `ArrayData` without the help of `SpecificMutableRow`.
Expand All @@ -73,7 +73,7 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi
private val decimalBuffer = new Array[Byte](minBytesForPrecision(DecimalType.MAX_PRECISION))

override def init(configuration: Configuration): WriteContext = {
val schemaString = configuration.get(CatalystWriteSupport.SPARK_ROW_SCHEMA)
val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
this.schema = StructType.fromString(schemaString)
this.writeLegacyParquetFormat = {
// `SQLConf.PARQUET_WRITE_LEGACY_FORMAT` should always be explicitly set in ParquetRelation
Expand Down Expand Up @@ -424,7 +424,7 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi
}
}

private[parquet] object CatalystWriteSupport {
private[parquet] object ParquetWriteSupport {
val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes"

def setSchema(schema: StructType, configuration: Configuration): Unit = {
Expand Down

0 comments on commit 3084b35

Please sign in to comment.