diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala index 0f5c8a9e02ab85ea2e96c5ac154a407c98d4012b..a177e66645c7d1957843336d10d270999a08de70 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala @@ -90,6 +90,14 @@ private[spark] class TypedConfigBuilder[T]( new TypedConfigBuilder(parent, s => fn(converter(s)), stringConverter) } + /** Checks if the user-provided value for the config matches the validator. */ + def checkValue(validator: T => Boolean, errorMsg: String): TypedConfigBuilder[T] = { + transform { v => + if (!validator(v)) throw new IllegalArgumentException(errorMsg) + v + } + } + /** Check that user-provided values for the config match a pre-defined set. */ def checkValues(validValues: Set[T]): TypedConfigBuilder[T] = { transform { v => diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala index 91a96bdda68336bea414f03fccc59dad6b851983..71eed464880b5cbcf1564c8605678f8dd2d4bc03 100644 --- a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala @@ -128,6 +128,28 @@ class ConfigEntrySuite extends SparkFunSuite { assert(conf.get(transformationConf) === "bar") } + test("conf entry: checkValue()") { + def createEntry(default: Int): ConfigEntry[Int] = + ConfigBuilder(testKey("checkValue")) + .intConf + .checkValue(value => value >= 0, "value must be non-negative") + .createWithDefault(default) + + val conf = new SparkConf() + + val entry = createEntry(10) + conf.set(entry, -1) + val e1 = intercept[IllegalArgumentException] { + conf.get(entry) + } + assert(e1.getMessage == "value must be non-negative") + + val e2 = intercept[IllegalArgumentException] { + createEntry(-1) + } + assert(e2.getMessage == "value must be non-negative") + } + test("conf entry: valid values check") { val conf = new SparkConf() val enum = ConfigBuilder(testKey("enum")) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala index 0b6fa5646970403e6dcf2988da098f52b5daf221..5f50ce1ba68ff1f2c33341fbc5e89c8df3347322 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala @@ -34,6 +34,8 @@ trait CatalystConf { def optimizerInSetConversionThreshold: Int def maxCaseBranchesForCodegen: Int + def tableRelationCacheSize: Int + def runSQLonFile: Boolean def warehousePath: String @@ -69,6 +71,7 @@ case class SimpleCatalystConf( optimizerMaxIterations: Int = 100, optimizerInSetConversionThreshold: Int = 10, maxCaseBranchesForCodegen: Int = 20, + tableRelationCacheSize: Int = 1000, runSQLonFile: Boolean = true, crossJoinEnabled: Boolean = false, cboEnabled: Boolean = false, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index e9543f79878b7be955fefe4c836143d5b34b0b95..dd0c5cb7066f50bcf69c3c3baaa6a61077ed9484 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -118,11 +118,11 @@ class SessionCatalog( } /** - * A cache of qualified table name to table relation plan. + * A cache of qualified table names to table relation plans. */ val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = { - // TODO: create a config instead of hardcode 1000 here. - CacheBuilder.newBuilder().maximumSize(1000).build[QualifiedTableName, LogicalPlan]() + val cacheSize = conf.tableRelationCacheSize + CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]() } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8c77da1763f8fa5251ed90a4954cb7600895c97c..dc0f1304069326e4fb66feda45882a0fc4b925e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -786,6 +786,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES) + def tableRelationCacheSize: Int = + getConf(StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE) + def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED) def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE) @@ -1034,6 +1037,14 @@ object StaticSQLConf { .intConf .createWithDefault(4000) + val FILESOURCE_TABLE_RELATION_CACHE_SIZE = + buildStaticConf("spark.sql.filesourceTableRelationCacheSize") + .internal() + .doc("The maximum size of the cache that maps qualified table names to table relation plans.") + .intConf + .checkValue(cacheSize => cacheSize >= 0, "The maximum size of the cache must not be negative") + .createWithDefault(1000) + // When enabling the debug, Spark SQL internal table properties are not filtered out; however, // some related DDL commands (e.g., ANALYZE TABLE and CREATE TABLE LIKE) might not work properly. val DEBUG_MODE = buildStaticConf("spark.sql.debug") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala index 6c12f0ff7d00cac5c6ea5551bdca28c3d6cb955e..0e3a5ca9d71ddc114ecb10b8f741c47786a942c6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala @@ -171,4 +171,20 @@ class SQLConfEntrySuite extends SparkFunSuite { buildConf(key).stringConf.createOptional } } + + test("StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE") { + val confEntry = StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE + assert(conf.getConf(confEntry) === 1000) + + conf.setConf(confEntry, -1) + val e1 = intercept[IllegalArgumentException] { + conf.getConf(confEntry) + } + assert(e1.getMessage === "The maximum size of the cache must not be negative") + + val e2 = intercept[IllegalArgumentException] { + conf.setConfString(confEntry.key, "-1") + } + assert(e2.getMessage === "The maximum size of the cache must not be negative") + } }