Skip to content
Snippets Groups Projects
Commit a8d56f53 authored by Sandeep Singh's avatar Sandeep Singh Committed by Andrew Or
Browse files

[SPARK-14422][SQL] Improve handling of optional configs in SQLConf

## What changes were proposed in this pull request?
Create a new API for handling Optional Configs in SQLConf.
Right now `getConf` for `OptionalConfigEntry[T]` returns value of type `T`, if doesn't exist throws an exception. Add new method `getOptionalConf`(suggestions on naming) which will now returns value of type `Option[T]`(so if doesn't exist it returns `None`).

## How was this patch tested?
Add test and ran tests locally.

Author: Sandeep Singh <sandeep@techaddict.me>

Closes #12846 from techaddict/SPARK-14422.
parent c4e0fde8
No related branches found
No related tags found
No related merge requests found
......@@ -296,7 +296,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
new Path(userSpecified).toUri.toString
}.orElse {
val checkpointConfig: Option[String] =
df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION, None)
df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION)
checkpointConfig.map { location =>
new Path(location, queryName).toUri.toString
......@@ -334,9 +334,10 @@ final class DataFrameWriter private[sql](df: DataFrame) {
partitionColumns = normalizedParCols.getOrElse(Nil))
val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
val checkpointLocation = extraOptions.getOrElse("checkpointLocation", {
new Path(df.sparkSession.sessionState.conf.checkpointLocation, queryName).toUri.toString
})
val checkpointLocation = extraOptions.getOrElse("checkpointLocation",
new Path(df.sparkSession.sessionState.conf.checkpointLocation.get, queryName).toUri.toString
)
df.sparkSession.sessionState.continuousQueryManager.startQuery(
queryName,
checkpointLocation,
......
......@@ -17,7 +17,7 @@
package org.apache.spark.sql
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
import org.apache.spark.sql.internal.SQLConf
......@@ -86,6 +86,10 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
sqlConf.getConf(entry)
}
protected[sql] def get[T](entry: OptionalConfigEntry[T]): Option[T] = {
sqlConf.getConf(entry)
}
/**
* Returns the value of Spark runtime configuration property for the given key.
*/
......
......@@ -546,7 +546,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD)
def checkpointLocation: String = getConf(CHECKPOINT_LOCATION)
def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION)
def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
......@@ -717,12 +717,11 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
/**
* Return the value of an optional Spark SQL configuration property for the given key. If the key
* is not set yet, throw an exception.
* is not set yet, returns None.
*/
def getConf[T](entry: OptionalConfigEntry[T]): T = {
def getConf[T](entry: OptionalConfigEntry[T]): Option[T] = {
require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
Option(settings.get(entry.key)).map(entry.rawValueConverter).
getOrElse(throw new NoSuchElementException(entry.key))
Option(settings.get(entry.key)).map(entry.rawValueConverter)
}
/**
......
......@@ -153,6 +153,17 @@ class SQLConfEntrySuite extends SparkFunSuite {
assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c", "d", "e"))
}
test("optionalConf") {
val key = "spark.sql.SQLConfEntrySuite.optional"
val confEntry = SQLConfigBuilder(key)
.stringConf
.createOptional
assert(conf.getConf(confEntry) === None)
conf.setConfString(key, "a")
assert(conf.getConf(confEntry) === Some("a"))
}
test("duplicate entry") {
val key = "spark.sql.SQLConfEntrySuite.duplicate"
SQLConfigBuilder(key).stringConf.createOptional
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment