Skip to content
Snippets Groups Projects
Commit 80ebca62 authored by Reynold Xin's avatar Reynold Xin Committed by Herman van Hovell
Browse files

[SPARK-19944][SQL] Move SQLConf from sql/core to sql/catalyst (branch-2.1)

## What changes were proposed in this pull request?
This patch moves SQLConf from sql/core to sql/catalyst. To minimize the changes, the patch used type alias to still keep CatalystConf (as a type alias) and SimpleCatalystConf (as a concrete class that extends SQLConf).

Motivation for the change is that it is pretty weird to have SQLConf only in sql/core and then we have to duplicate config options that impact optimizer/analyzer in sql/catalyst using CatalystConf.

This is a backport into branch-2.1 to minimize merge conflicts.

## How was this patch tested?
N/A

Author: Reynold Xin <rxin@databricks.com>

Closes #17301 from rxin/branch-2.1-conf.
parent a0ce845d
No related branches found
No related tags found
No related merge requests found
......@@ -17,50 +17,22 @@
package org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.internal.SQLConf
/**
* Interface for configuration options used in the catalyst module.
* A SQLConf that can be used for local testing. This class is only here to minimize the change
* for ticket SPARK-19944 (moves SQLConf from sql/core to sql/catalyst). This class should
* eventually be removed (test cases should just create SQLConf and set values appropriately).
*/
trait CatalystConf {
def caseSensitiveAnalysis: Boolean
def orderByOrdinal: Boolean
def groupByOrdinal: Boolean
def optimizerMaxIterations: Int
def optimizerInSetConversionThreshold: Int
def maxCaseBranchesForCodegen: Int
def runSQLonFile: Boolean
def warehousePath: String
/** If true, cartesian products between relations will be allowed for all
* join types(inner, (left|right|full) outer).
* If false, cartesian products will require explicit CROSS JOIN syntax.
*/
def crossJoinEnabled: Boolean
/**
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
* identifiers are equal.
*/
def resolver: Resolver = {
if (caseSensitiveAnalysis) caseSensitiveResolution else caseInsensitiveResolution
}
}
/** A CatalystConf that can be used for local testing. */
case class SimpleCatalystConf(
caseSensitiveAnalysis: Boolean,
orderByOrdinal: Boolean = true,
groupByOrdinal: Boolean = true,
optimizerMaxIterations: Int = 100,
optimizerInSetConversionThreshold: Int = 10,
maxCaseBranchesForCodegen: Int = 20,
runSQLonFile: Boolean = true,
crossJoinEnabled: Boolean = false,
warehousePath: String = "/user/hive/warehouse")
extends CatalystConf
override val caseSensitiveAnalysis: Boolean,
override val orderByOrdinal: Boolean = true,
override val groupByOrdinal: Boolean = true,
override val optimizerMaxIterations: Int = 100,
override val optimizerInSetConversionThreshold: Int = 10,
override val maxCaseBranchesForCodegen: Int = 20,
override val runSQLonFile: Boolean = true,
override val crossJoinEnabled: Boolean = false,
override val warehousePath: String = "/user/hive/warehouse")
extends SQLConf
......@@ -17,6 +17,8 @@
package org.apache.spark.sql
import org.apache.spark.sql.internal.SQLConf
/**
* Catalyst is a library for manipulating relational query plans. All classes in catalyst are
* considered an internal API to Spark SQL and are subject to change between minor releases.
......@@ -29,4 +31,9 @@ package object catalyst {
*/
protected[sql] object ScalaReflectionLock
/**
* This class is only here to minimize the change for ticket SPARK-19944
* (moves SQLConf from sql/core to sql/catalyst). This class should eventually be removed.
*/
type CatalystConf = SQLConf
}
......@@ -24,15 +24,11 @@ import scala.collection.JavaConverters._
import scala.collection.immutable
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
import org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol
import org.apache.spark.util.Utils
import org.apache.spark.sql.catalyst.analysis.Resolver
////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines the configuration options for Spark SQL.
......@@ -240,7 +236,7 @@ object SQLConf {
"of org.apache.parquet.hadoop.ParquetOutputCommitter.")
.internal()
.stringConf
.createWithDefault(classOf[ParquetOutputCommitter].getName)
.createWithDefault("org.apache.parquet.hadoop.ParquetOutputCommitter")
val PARQUET_VECTORIZED_READER_ENABLED =
SQLConfigBuilder("spark.sql.parquet.enableVectorizedReader")
......@@ -406,7 +402,8 @@ object SQLConf {
SQLConfigBuilder("spark.sql.sources.commitProtocolClass")
.internal()
.stringConf
.createWithDefault(classOf[SQLHadoopMapReduceCommitProtocol].getName)
.createWithDefault(
"org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.threshold")
......@@ -552,7 +549,7 @@ object SQLConf {
SQLConfigBuilder("spark.sql.streaming.commitProtocolClass")
.internal()
.stringConf
.createWithDefault(classOf[ManifestFileCommitProtocol].getName)
.createWithDefault("org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol")
val FILE_SINK_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion")
.internal()
......@@ -658,7 +655,7 @@ object SQLConf {
*
* SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
*/
private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
class SQLConf extends Serializable with Logging {
import SQLConf._
/** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */
......@@ -761,6 +758,18 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
/**
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
* identifiers are equal.
*/
def resolver: Resolver = {
if (caseSensitiveAnalysis) {
org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
} else {
org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
}
}
def subexpressionEliminationEnabled: Boolean =
getConf(SUBEXPRESSION_ELIMINATION_ENABLED)
......@@ -818,7 +827,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def dataFramePivotMaxValues: Int = getConf(DATAFRAME_PIVOT_MAX_VALUES)
override def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES)
def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES)
def enableTwoLevelAggMap: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP)
......@@ -830,11 +839,11 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES)
override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
override def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)
def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)
def ndvMaxError: Double = getConf(NDV_MAX_ERROR)
/** ********************** SQLConf functionality methods ************ */
......@@ -956,55 +965,3 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
settings.clear()
}
}
/**
* Static SQL configuration is a cross-session, immutable Spark configuration. External users can
* see the static sql configs via `SparkSession.conf`, but can NOT set/unset them.
*/
object StaticSQLConf {
val globalConfKeys = java.util.Collections.synchronizedSet(new java.util.HashSet[String]())
private def buildConf(key: String): ConfigBuilder = {
ConfigBuilder(key).onCreate { entry =>
globalConfKeys.add(entry.key)
SQLConf.register(entry)
}
}
val WAREHOUSE_PATH = buildConf("spark.sql.warehouse.dir")
.doc("The default location for managed databases and tables.")
.stringConf
.createWithDefault(Utils.resolveURI("spark-warehouse").toString)
val CATALOG_IMPLEMENTATION = buildConf("spark.sql.catalogImplementation")
.internal()
.stringConf
.checkValues(Set("hive", "in-memory"))
.createWithDefault("in-memory")
val GLOBAL_TEMP_DATABASE = buildConf("spark.sql.globalTempDatabase")
.internal()
.stringConf
.createWithDefault("global_temp")
// This is used to control when we will split a schema's JSON string to multiple pieces
// in order to fit the JSON string in metastore's table property (by default, the value has
// a length restriction of 4000 characters, so do not use a value larger than 4000 as the default
// value of this property). We will split the JSON string of a schema to its length exceeds the
// threshold. Note that, this conf is only read in HiveExternalCatalog which is cross-session,
// that's why this conf has to be a static SQL conf.
val SCHEMA_STRING_LENGTH_THRESHOLD = buildConf("spark.sql.sources.schemaStringLengthThreshold")
.doc("The maximum length allowed in a single cell when " +
"storing additional schema information in Hive's metastore.")
.internal()
.intConf
.createWithDefault(4000)
// When enabling the debug, Spark SQL internal table properties are not filtered out; however,
// some related DDL commands (e.g., ANALYZE TABLE and CREATE TABLE LIKE) might not work properly.
val DEBUG_MODE = buildConf("spark.sql.debug")
.internal()
.doc("Only used for internal debugging. Not all functions are supported when it is enabled.")
.booleanConf
.createWithDefault(false)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.internal
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.util.Utils
/**
* Static SQL configuration is a cross-session, immutable Spark configuration. External users can
* see the static sql configs via `SparkSession.conf`, but can NOT set/unset them.
*//**
* Static SQL configuration is a cross-session, immutable Spark configuration. External users can
* see the static sql configs via `SparkSession.conf`, but can NOT set/unset them.
*/
object StaticSQLConf {
val globalConfKeys = java.util.Collections.synchronizedSet(new java.util.HashSet[String]())
private def buildConf(key: String): ConfigBuilder = {
ConfigBuilder(key).onCreate { entry =>
globalConfKeys.add(entry.key)
SQLConf.register(entry)
}
}
val WAREHOUSE_PATH = buildConf("spark.sql.warehouse.dir")
.doc("The default location for managed databases and tables.")
.stringConf
.createWithDefault(Utils.resolveURI("spark-warehouse").toString)
val CATALOG_IMPLEMENTATION = buildConf("spark.sql.catalogImplementation")
.internal()
.stringConf
.checkValues(Set("hive", "in-memory"))
.createWithDefault("in-memory")
val GLOBAL_TEMP_DATABASE = buildConf("spark.sql.globalTempDatabase")
.internal()
.stringConf
.createWithDefault("global_temp")
// This is used to control when we will split a schema's JSON string to multiple pieces
// in order to fit the JSON string in metastore's table property (by default, the value has
// a length restriction of 4000 characters, so do not use a value larger than 4000 as the default
// value of this property). We will split the JSON string of a schema to its length exceeds the
// threshold. Note that, this conf is only read in HiveExternalCatalog which is cross-session,
// that's why this conf has to be a static SQL conf.
val SCHEMA_STRING_LENGTH_THRESHOLD = buildConf("spark.sql.sources.schemaStringLengthThreshold")
.doc("The maximum length allowed in a single cell when " +
"storing additional schema information in Hive's metastore.")
.internal()
.intConf
.createWithDefault(4000)
// When enabling the debug, Spark SQL internal table properties are not filtered out; however,
// some related DDL commands (e.g., ANALYZE TABLE and CREATE TABLE LIKE) might not work properly.
val DEBUG_MODE = buildConf("spark.sql.debug")
.internal()
.doc("Only used for internal debugging. Not all functions are supported when it is enabled.")
.booleanConf
.createWithDefault(false)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment