From d8f45408635d4fccac557cb1e877dfe9267fb326 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun <dongjoon@apache.org> Date: Thu, 31 Aug 2017 08:16:58 +0900 Subject: [PATCH] [SPARK-21839][SQL] Support SQL config for ORC compression ## What changes were proposed in this pull request? This PR aims to support `spark.sql.orc.compression.codec` like Parquet's `spark.sql.parquet.compression.codec`. Users can use SQLConf to control ORC compression, too. ## How was this patch tested? Pass the Jenkins with new and updated test cases. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #19055 from dongjoon-hyun/SPARK-21839. --- python/pyspark/sql/readwriter.py | 5 ++-- .../apache/spark/sql/internal/SQLConf.scala | 10 +++++++ .../apache/spark/sql/DataFrameWriter.scala | 8 +++-- .../spark/sql/hive/orc/OrcFileFormat.scala | 2 +- .../spark/sql/hive/orc/OrcOptions.scala | 18 +++++++----- .../spark/sql/hive/orc/OrcSourceSuite.scala | 29 +++++++++++++++++-- 6 files changed, 57 insertions(+), 15 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 01da0dc27d..cb847a0420 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -851,8 +851,9 @@ class DataFrameWriter(OptionUtils): :param partitionBy: names of partitioning columns :param compression: compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, snappy, zlib, and lzo). - This will override ``orc.compress``. If None is set, it uses the - default value, ``snappy``. + This will override ``orc.compress`` and + ``spark.sql.orc.compression.codec``. If None is set, it uses the value + specified in ``spark.sql.orc.compression.codec``. >>> orc_df = spark.read.orc('python/test_support/sql/orc_partitioned') >>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data')) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a685099505..c407874381 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -322,6 +322,14 @@ object SQLConf { .booleanConf .createWithDefault(true) + val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec") + .doc("Sets the compression codec use when writing ORC files. Acceptable values include: " + + "none, uncompressed, snappy, zlib, lzo.") + .stringConf + .transform(_.toLowerCase(Locale.ROOT)) + .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo")) + .createWithDefault("snappy") + val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown") .doc("When true, enable filter pushdown for ORC files.") .booleanConf @@ -998,6 +1006,8 @@ class SQLConf extends Serializable with Logging { def useCompression: Boolean = getConf(COMPRESS_CACHED) + def orcCompressionCodec: String = getConf(ORC_COMPRESSION) + def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION) def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index cca93525d6..07347d2748 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -517,9 +517,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * * You can set the following ORC-specific option(s) for writing ORC files: * <ul> - * <li>`compression` (default `snappy`): compression codec to use when saving to file. This can be - * one of the known case-insensitive shorten names(`none`, `snappy`, `zlib`, and `lzo`). - * This will override `orc.compress`.</li> + * <li>`compression` (default is the value specified in `spark.sql.orc.compression.codec`): + * compression codec to use when saving to file. This can be one of the known case-insensitive + * shorten names(`none`, `snappy`, `zlib`, and `lzo`). This will override + * `orc.compress` and `spark.sql.parquet.compression.codec`. If `orc.compress` is given, + * it overrides `spark.sql.parquet.compression.codec`.</li> * </ul> * * @since 1.5.0 diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 3a34ec55c8..edf2013a4c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -68,7 +68,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - val orcOptions = new OrcOptions(options) + val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) val configuration = job.getConfiguration diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala index 043eb69818..7f94c8c579 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala @@ -20,30 +20,34 @@ package org.apache.spark.sql.hive.orc import java.util.Locale import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.internal.SQLConf /** * Options for the ORC data source. */ -private[orc] class OrcOptions(@transient private val parameters: CaseInsensitiveMap[String]) +private[orc] class OrcOptions( + @transient private val parameters: CaseInsensitiveMap[String], + @transient private val sqlConf: SQLConf) extends Serializable { import OrcOptions._ - def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) + def this(parameters: Map[String, String], sqlConf: SQLConf) = + this(CaseInsensitiveMap(parameters), sqlConf) /** - * Compression codec to use. By default snappy compression. + * Compression codec to use. * Acceptable values are defined in [[shortOrcCompressionCodecNames]]. */ val compressionCodec: String = { - // `orc.compress` is a ORC configuration. So, here we respect this as an option but - // `compression` has higher precedence than `orc.compress`. It means if both are set, - // we will use `compression`. + // `compression`, `orc.compress`, and `spark.sql.orc.compression.codec` are + // in order of precedence from highest to lowest. val orcCompressionConf = parameters.get(OrcRelation.ORC_COMPRESSION) val codecName = parameters .get("compression") .orElse(orcCompressionConf) - .getOrElse("snappy").toLowerCase(Locale.ROOT) + .getOrElse(sqlConf.orcCompressionCodec) + .toLowerCase(Locale.ROOT) if (!shortOrcCompressionCodecNames.contains(codecName)) { val availableCodecs = shortOrcCompressionCodecNames.keys.map(_.toLowerCase(Locale.ROOT)) throw new IllegalArgumentException(s"Codec [$codecName] " + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 52fa401d32..781de6631f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -22,8 +22,8 @@ import java.io.File import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.{QueryTest, Row} -import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -149,7 +149,8 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA } test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { - assert(new OrcOptions(Map("Orc.Compress" -> "NONE")).compressionCodec == "NONE") + val conf = sqlContext.sessionState.conf + assert(new OrcOptions(Map("Orc.Compress" -> "NONE"), conf).compressionCodec == "NONE") } test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") { @@ -194,6 +195,30 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA Utils.deleteRecursively(location) } } + + test("SPARK-21839: Add SQL config for ORC compression") { + val conf = sqlContext.sessionState.conf + // Test if the default of spark.sql.orc.compression.codec is snappy + assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "SNAPPY") + + // OrcOptions's parameters have a higher priority than SQL configuration. + // `compression` -> `orc.compression` -> `spark.sql.orc.compression.codec` + withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") { + assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "NONE") + val map1 = Map("orc.compress" -> "zlib") + val map2 = Map("orc.compress" -> "zlib", "compression" -> "lzo") + assert(new OrcOptions(map1, conf).compressionCodec == "ZLIB") + assert(new OrcOptions(map2, conf).compressionCodec == "LZO") + } + + // Test all the valid options of spark.sql.orc.compression.codec + Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO").foreach { c => + withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) { + val expected = if (c == "UNCOMPRESSED") "NONE" else c + assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == expected) + } + } + } } class OrcSourceSuite extends OrcSuite { -- GitLab