diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala deleted file mode 100644 index cff0efa97993215fe4bf1f98ecfa0e281e725fc5..0000000000000000000000000000000000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst - -import java.util.TimeZone - -import org.apache.spark.sql.catalyst.analysis._ - -/** - * Interface for configuration options used in the catalyst module. - */ -trait CatalystConf { - def caseSensitiveAnalysis: Boolean - - def orderByOrdinal: Boolean - def groupByOrdinal: Boolean - - def optimizerMaxIterations: Int - def optimizerInSetConversionThreshold: Int - def maxCaseBranchesForCodegen: Int - - def tableRelationCacheSize: Int - - def runSQLonFile: Boolean - - def warehousePath: String - - def sessionLocalTimeZone: String - - /** If true, cartesian products between relations will be allowed for all - * join types(inner, (left|right|full) outer). - * If false, cartesian products will require explicit CROSS JOIN syntax. - */ - def crossJoinEnabled: Boolean - - /** - * Returns the [[Resolver]] for the current configuration, which can be used to determine if two - * identifiers are equal. - */ - def resolver: Resolver = { - if (caseSensitiveAnalysis) caseSensitiveResolution else caseInsensitiveResolution - } - - /** - * Enables CBO for estimation of plan statistics when set true. - */ - def cboEnabled: Boolean - - /** Enables join reorder in CBO. */ - def joinReorderEnabled: Boolean - - /** The maximum number of joined nodes allowed in the dynamic programming algorithm. */ - def joinReorderDPThreshold: Int - - override def clone(): CatalystConf = throw new CloneNotSupportedException() -} - - -/** A CatalystConf that can be used for local testing. */ -case class SimpleCatalystConf( - caseSensitiveAnalysis: Boolean, - orderByOrdinal: Boolean = true, - groupByOrdinal: Boolean = true, - optimizerMaxIterations: Int = 100, - optimizerInSetConversionThreshold: Int = 10, - maxCaseBranchesForCodegen: Int = 20, - tableRelationCacheSize: Int = 1000, - runSQLonFile: Boolean = true, - crossJoinEnabled: Boolean = false, - cboEnabled: Boolean = false, - joinReorderEnabled: Boolean = false, - joinReorderDPThreshold: Int = 12, - warehousePath: String = "/user/hive/warehouse", - sessionLocalTimeZone: String = TimeZone.getDefault().getID) - extends CatalystConf { - - override def clone(): SimpleCatalystConf = this.copy() -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SimpleCatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SimpleCatalystConf.scala new file mode 100644 index 0000000000000000000000000000000000000000..746f84459de26806a4037b536f87f33084b83370 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SimpleCatalystConf.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst + +import java.util.TimeZone + +import org.apache.spark.sql.internal.SQLConf + + +/** + * A SQLConf that can be used for local testing. This class is only here to minimize the change + * for ticket SPARK-19944 (moves SQLConf from sql/core to sql/catalyst). This class should + * eventually be removed (test cases should just create SQLConf and set values appropriately). + */ +case class SimpleCatalystConf( + override val caseSensitiveAnalysis: Boolean, + override val orderByOrdinal: Boolean = true, + override val groupByOrdinal: Boolean = true, + override val optimizerMaxIterations: Int = 100, + override val optimizerInSetConversionThreshold: Int = 10, + override val maxCaseBranchesForCodegen: Int = 20, + override val tableRelationCacheSize: Int = 1000, + override val runSQLonFile: Boolean = true, + override val crossJoinEnabled: Boolean = false, + override val cboEnabled: Boolean = false, + override val joinReorderEnabled: Boolean = false, + override val joinReorderDPThreshold: Int = 12, + override val warehousePath: String = "/user/hive/warehouse", + override val sessionLocalTimeZone: String = TimeZone.getDefault().getID) + extends SQLConf { + + override def clone(): SimpleCatalystConf = this.copy() +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala index 105cdf52500c6e92e57a933f5308211c81a4a56f..4af56afebb762ebe45683b8389b106c4456e3a9f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import org.apache.spark.sql.internal.SQLConf + /** * Catalyst is a library for manipulating relational query plans. All classes in catalyst are * considered an internal API to Spark SQL and are subject to change between minor releases. @@ -29,4 +31,9 @@ package object catalyst { */ protected[sql] object ScalaReflectionLock + /** + * This class is only here to minimize the change for ticket SPARK-19944 + * (moves SQLConf from sql/core to sql/catalyst). This class should eventually be removed. + */ + type CatalystConf = SQLConf } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala similarity index 91% rename from sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8e3f567b7dd9030274d8340c0698361e80337d26..315bedb12e71620e7730ae45772f236ee40ccffd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -24,15 +24,11 @@ import scala.collection.JavaConverters._ import scala.collection.immutable import org.apache.hadoop.fs.Path -import org.apache.parquet.hadoop.ParquetOutputCommitter import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.network.util.ByteUnit -import org.apache.spark.sql.catalyst.CatalystConf -import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol -import org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol -import org.apache.spark.util.Utils +import org.apache.spark.sql.catalyst.analysis.Resolver //////////////////////////////////////////////////////////////////////////////////////////////////// // This file defines the configuration options for Spark SQL. @@ -251,7 +247,7 @@ object SQLConf { "of org.apache.parquet.hadoop.ParquetOutputCommitter.") .internal() .stringConf - .createWithDefault(classOf[ParquetOutputCommitter].getName) + .createWithDefault("org.apache.parquet.hadoop.ParquetOutputCommitter") val PARQUET_VECTORIZED_READER_ENABLED = buildConf("spark.sql.parquet.enableVectorizedReader") @@ -417,7 +413,8 @@ object SQLConf { buildConf("spark.sql.sources.commitProtocolClass") .internal() .stringConf - .createWithDefault(classOf[SQLHadoopMapReduceCommitProtocol].getName) + .createWithDefault( + "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol") val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = buildConf("spark.sql.sources.parallelPartitionDiscovery.threshold") @@ -578,7 +575,7 @@ object SQLConf { buildConf("spark.sql.streaming.commitProtocolClass") .internal() .stringConf - .createWithDefault(classOf[ManifestFileCommitProtocol].getName) + .createWithDefault("org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol") val OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD = buildConf("spark.sql.objectHashAggregate.sortBased.fallbackThreshold") @@ -723,7 +720,7 @@ object SQLConf { * * SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads). */ -private[sql] class SQLConf extends Serializable with CatalystConf with Logging { +class SQLConf extends Serializable with Logging { import SQLConf._ /** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */ @@ -833,6 +830,18 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE) + /** + * Returns the [[Resolver]] for the current configuration, which can be used to determine if two + * identifiers are equal. + */ + def resolver: Resolver = { + if (caseSensitiveAnalysis) { + org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution + } else { + org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution + } + } + def subexpressionEliminationEnabled: Boolean = getConf(SUBEXPRESSION_ELIMINATION_ENABLED) @@ -890,7 +899,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def dataFramePivotMaxValues: Int = getConf(DATAFRAME_PIVOT_MAX_VALUES) - override def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES) + def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES) def enableTwoLevelAggMap: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP) @@ -907,21 +916,21 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def hiveThriftServerSingleSession: Boolean = getConf(StaticSQLConf.HIVE_THRIFT_SERVER_SINGLESESSION) - override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL) + def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL) - override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL) + def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL) - override def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED) + def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED) - override def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE) + def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE) def ndvMaxError: Double = getConf(NDV_MAX_ERROR) - override def cboEnabled: Boolean = getConf(SQLConf.CBO_ENABLED) + def cboEnabled: Boolean = getConf(SQLConf.CBO_ENABLED) - override def joinReorderEnabled: Boolean = getConf(SQLConf.JOIN_REORDER_ENABLED) + def joinReorderEnabled: Boolean = getConf(SQLConf.JOIN_REORDER_ENABLED) - override def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD) + def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD) /** ********************** SQLConf functionality methods ************ */ @@ -1050,66 +1059,3 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { result } } - -/** - * Static SQL configuration is a cross-session, immutable Spark configuration. External users can - * see the static sql configs via `SparkSession.conf`, but can NOT set/unset them. - */ -object StaticSQLConf { - - import SQLConf.buildStaticConf - - val WAREHOUSE_PATH = buildStaticConf("spark.sql.warehouse.dir") - .doc("The default location for managed databases and tables.") - .stringConf - .createWithDefault(Utils.resolveURI("spark-warehouse").toString) - - val CATALOG_IMPLEMENTATION = buildStaticConf("spark.sql.catalogImplementation") - .internal() - .stringConf - .checkValues(Set("hive", "in-memory")) - .createWithDefault("in-memory") - - val GLOBAL_TEMP_DATABASE = buildStaticConf("spark.sql.globalTempDatabase") - .internal() - .stringConf - .createWithDefault("global_temp") - - // This is used to control when we will split a schema's JSON string to multiple pieces - // in order to fit the JSON string in metastore's table property (by default, the value has - // a length restriction of 4000 characters, so do not use a value larger than 4000 as the default - // value of this property). We will split the JSON string of a schema to its length exceeds the - // threshold. Note that, this conf is only read in HiveExternalCatalog which is cross-session, - // that's why this conf has to be a static SQL conf. - val SCHEMA_STRING_LENGTH_THRESHOLD = - buildStaticConf("spark.sql.sources.schemaStringLengthThreshold") - .doc("The maximum length allowed in a single cell when " + - "storing additional schema information in Hive's metastore.") - .internal() - .intConf - .createWithDefault(4000) - - val FILESOURCE_TABLE_RELATION_CACHE_SIZE = - buildStaticConf("spark.sql.filesourceTableRelationCacheSize") - .internal() - .doc("The maximum size of the cache that maps qualified table names to table relation plans.") - .intConf - .checkValue(cacheSize => cacheSize >= 0, "The maximum size of the cache must not be negative") - .createWithDefault(1000) - - // When enabling the debug, Spark SQL internal table properties are not filtered out; however, - // some related DDL commands (e.g., ANALYZE TABLE and CREATE TABLE LIKE) might not work properly. - val DEBUG_MODE = buildStaticConf("spark.sql.debug") - .internal() - .doc("Only used for internal debugging. Not all functions are supported when it is enabled.") - .booleanConf - .createWithDefault(false) - - val HIVE_THRIFT_SERVER_SINGLESESSION = - buildStaticConf("spark.sql.hive.thriftServer.singleSession") - .doc("When set to true, Hive Thrift server is running in a single session mode. " + - "All the JDBC/ODBC connections share the temporary views, function registries, " + - "SQL configuration and the current database.") - .booleanConf - .createWithDefault(false) -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala new file mode 100644 index 0000000000000000000000000000000000000000..af1a9cee2962ac1284d5a2401d7f1cbe32309a09 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.internal + +import org.apache.spark.util.Utils + + +/** + * Static SQL configuration is a cross-session, immutable Spark configuration. External users can + * see the static sql configs via `SparkSession.conf`, but can NOT set/unset them. + */ +object StaticSQLConf { + + import SQLConf.buildStaticConf + + val WAREHOUSE_PATH = buildStaticConf("spark.sql.warehouse.dir") + .doc("The default location for managed databases and tables.") + .stringConf + .createWithDefault(Utils.resolveURI("spark-warehouse").toString) + + val CATALOG_IMPLEMENTATION = buildStaticConf("spark.sql.catalogImplementation") + .internal() + .stringConf + .checkValues(Set("hive", "in-memory")) + .createWithDefault("in-memory") + + val GLOBAL_TEMP_DATABASE = buildStaticConf("spark.sql.globalTempDatabase") + .internal() + .stringConf + .createWithDefault("global_temp") + + // This is used to control when we will split a schema's JSON string to multiple pieces + // in order to fit the JSON string in metastore's table property (by default, the value has + // a length restriction of 4000 characters, so do not use a value larger than 4000 as the default + // value of this property). We will split the JSON string of a schema to its length exceeds the + // threshold. Note that, this conf is only read in HiveExternalCatalog which is cross-session, + // that's why this conf has to be a static SQL conf. + val SCHEMA_STRING_LENGTH_THRESHOLD = + buildStaticConf("spark.sql.sources.schemaStringLengthThreshold") + .doc("The maximum length allowed in a single cell when " + + "storing additional schema information in Hive's metastore.") + .internal() + .intConf + .createWithDefault(4000) + + val FILESOURCE_TABLE_RELATION_CACHE_SIZE = + buildStaticConf("spark.sql.filesourceTableRelationCacheSize") + .internal() + .doc("The maximum size of the cache that maps qualified table names to table relation plans.") + .intConf + .checkValue(cacheSize => cacheSize >= 0, "The maximum size of the cache must not be negative") + .createWithDefault(1000) + + // When enabling the debug, Spark SQL internal table properties are not filtered out; however, + // some related DDL commands (e.g., ANALYZE TABLE and CREATE TABLE LIKE) might not work properly. + val DEBUG_MODE = buildStaticConf("spark.sql.debug") + .internal() + .doc("Only used for internal debugging. Not all functions are supported when it is enabled.") + .booleanConf + .createWithDefault(false) + + val HIVE_THRIFT_SERVER_SINGLESESSION = + buildStaticConf("spark.sql.hive.thriftServer.singleSession") + .doc("When set to true, Hive Thrift server is running in a single session mode. " + + "All the JDBC/ODBC connections share the temporary views, function registries, " + + "SQL configuration and the current database.") + .booleanConf + .createWithDefault(false) +}