Skip to content
Snippets Groups Projects
Commit 23eff5e5 authored by Cheng Lian's avatar Cheng Lian Committed by Reynold Xin
Browse files

[SPARK-15979][SQL] Renames CatalystWriteSupport to ParquetWriteSupport

## What changes were proposed in this pull request?

PR #13696 renamed various Parquet support classes but left `CatalystWriteSupport` behind. This PR is renames it as a follow-up.

## How was this patch tested?

N/A.

Author: Cheng Lian <lian@databricks.com>

Closes #14070 from liancheng/spark-15979-follow-up.
parent 478b71d0
No related branches found
No related tags found
No related merge requests found
......@@ -99,13 +99,13 @@ private[sql] class ParquetFileFormat
// bundled with `ParquetOutputFormat[Row]`.
job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
ParquetOutputFormat.setWriteSupportClass(job, classOf[CatalystWriteSupport])
ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])
// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
dataSchema).asInstanceOf[StructType]
CatalystWriteSupport.setSchema(dataSchemaToWrite, conf)
ParquetWriteSupport.setSchema(dataSchemaToWrite, conf)
// Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema)
// and `CatalystWriteSupport` (writing actual rows to Parquet files).
......@@ -295,14 +295,14 @@ private[sql] class ParquetFileFormat
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
ParquetSchemaConverter.checkFieldNames(requiredSchema).json)
hadoopConf.set(
CatalystWriteSupport.SPARK_ROW_SCHEMA,
ParquetWriteSupport.SPARK_ROW_SCHEMA,
ParquetSchemaConverter.checkFieldNames(requiredSchema).json)
// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
requiredSchema).asInstanceOf[StructType]
CatalystWriteSupport.setSchema(dataSchemaToWrite, hadoopConf)
ParquetWriteSupport.setSchema(dataSchemaToWrite, hadoopConf)
// Sets flags for `CatalystSchemaConverter`
hadoopConf.setBoolean(
......@@ -435,14 +435,14 @@ private[sql] class ParquetOutputWriterFactory(
// bundled with `ParquetOutputFormat[Row]`.
job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
ParquetOutputFormat.setWriteSupportClass(job, classOf[CatalystWriteSupport])
ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])
// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(
StructType.metadataKeyForOptionalField,
dataSchema).asInstanceOf[StructType]
CatalystWriteSupport.setSchema(dataSchemaToWrite, conf)
ParquetWriteSupport.setSchema(dataSchemaToWrite, conf)
// Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema)
// and `CatalystWriteSupport` (writing actual rows to Parquet files).
......@@ -611,7 +611,7 @@ private[sql] object ParquetFileFormat extends Logging {
})
conf.set(
CatalystWriteSupport.SPARK_ROW_SCHEMA,
ParquetWriteSupport.SPARK_ROW_SCHEMA,
ParquetSchemaConverter.checkFieldNames(dataSchema).json)
// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
......
......@@ -48,7 +48,7 @@ import org.apache.spark.sql.types._
* of this option is propagated to this class by the `init()` method and its Hadoop configuration
* argument.
*/
private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] with Logging {
private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
// A `ValueWriter` is responsible for writing a field of an `InternalRow` to the record consumer.
// Here we are using `SpecializedGetters` rather than `InternalRow` so that we can directly access
// data in `ArrayData` without the help of `SpecificMutableRow`.
......@@ -73,7 +73,7 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi
private val decimalBuffer = new Array[Byte](minBytesForPrecision(DecimalType.MAX_PRECISION))
override def init(configuration: Configuration): WriteContext = {
val schemaString = configuration.get(CatalystWriteSupport.SPARK_ROW_SCHEMA)
val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
this.schema = StructType.fromString(schemaString)
this.writeLegacyParquetFormat = {
// `SQLConf.PARQUET_WRITE_LEGACY_FORMAT` should always be explicitly set in ParquetRelation
......@@ -424,7 +424,7 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi
}
}
private[parquet] object CatalystWriteSupport {
private[parquet] object ParquetWriteSupport {
val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes"
def setSchema(schema: StructType, configuration: Configuration): Unit = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment