diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index 53d732403f979c9e14b833cba92273112e40074a..6c3fe07709fa3d87f457fbe1853a555b3aa368c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -313,7 +313,7 @@ class RelationalGroupedDataset protected[sql](
    */
   def pivot(pivotColumn: String): RelationalGroupedDataset = {
     // This is to prevent unintended OOM errors when the number of distinct values is large
-    val maxValues = df.sparkSession.conf.get(SQLConf.DATAFRAME_PIVOT_MAX_VALUES)
+    val maxValues = df.sparkSession.sessionState.conf.dataFramePivotMaxValues
     // Get the distinct values of the column and sort them so its consistent
     val values = df.select(pivotColumn)
       .distinct()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index d4845637be049090ab60f2f2f36340a44bd7f604..383b3a233fc27bed0aa3779955b98b2b59bd9129 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -55,7 +55,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
   }
 
   def assertSupported(): Unit = {
-    if (sparkSession.sessionState.conf.getConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
+    if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
       UnsupportedOperationChecker.checkForBatch(analyzed)
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 825c01365dd1e02ee206a42435b5e487552f6ad6..93154bd2ca69cc8d713cd3f9cb886278f59cd436 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -231,7 +231,7 @@ case class DataSource(
           }
         }
 
-        val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE)
+        val isSchemaInferenceEnabled = sparkSession.sessionState.conf.streamingSchemaInference
         val isTextSource = providingClass == classOf[text.TextFileFormat]
         // If the schema inference is disabled, only text sources require schema to be specified
         if (!isSchemaInferenceEnabled && !isTextSource && userSpecifiedSchema.isEmpty) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 02ce7fab64729242c58505f4ae67cbafcc1c4bb6..99ca3df673568a2900ce55655290dfe291400638 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -131,7 +131,7 @@ case class InsertIntoHadoopFsRelationCommand(
             dataColumns = dataColumns,
             inputSchema = query.output,
             PartitioningUtils.DEFAULT_PARTITION_NAME,
-            sparkSession.conf.get(SQLConf.PARTITION_MAX_FILES),
+            sparkSession.sessionState.conf.partitionMaxFiles,
             isAppend)
         }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index cef9d4d9c7f1b333af5bb22e0c423e0395977d22..d2d5b56c82946974a3f12d01e227d992ad3c36e1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -126,7 +126,7 @@ abstract class PartitioningAwareFileCatalog(
         PartitioningUtils.parsePartitions(
           leafDirs,
           PartitioningUtils.DEFAULT_PARTITION_NAME,
-          typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled(),
+          typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
           basePaths = basePaths)
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 9208c82179d8d913b54384822691c1c98d526783..e7c3545630fea5b6b9683ae1251ff174d1e59791 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -151,7 +151,7 @@ class ParquetFileFormat
     // Should we merge schemas from all Parquet part-files?
     val shouldMergeSchemas = parquetOptions.mergeSchema
 
-    val mergeRespectSummaries = sparkSession.conf.get(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES)
+    val mergeRespectSummaries = sparkSession.sessionState.conf.isParquetSchemaRespectSummaries
 
     val filesByType = splitFiles(files)
 
@@ -308,14 +308,14 @@ class ParquetFileFormat
     // Sets flags for `CatalystSchemaConverter`
     hadoopConf.setBoolean(
       SQLConf.PARQUET_BINARY_AS_STRING.key,
-      sparkSession.conf.get(SQLConf.PARQUET_BINARY_AS_STRING))
+      sparkSession.sessionState.conf.isParquetBinaryAsString)
     hadoopConf.setBoolean(
       SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
-      sparkSession.conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP))
+      sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
 
     // Try to push down filters when filter push-down is enabled.
     val pushed =
-      if (sparkSession.conf.get(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) {
+      if (sparkSession.sessionState.conf.parquetFilterPushDown) {
         filters
           // Collects all converted Parquet filter predicates. Notice that not all predicates can be
           // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index 3eec582714e1582a90585732ff214b446fea36fc..615731889dfadcb486712defc83683547142f626 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -52,7 +52,7 @@ private[parquet] class ParquetOptions(
   val mergeSchema: Boolean = parameters
     .get(MERGE_SCHEMA)
     .map(_.toBoolean)
-    .getOrElse(sqlConf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
+    .getOrElse(sqlConf.isParquetSchemaMergingEnabled)
 }
 
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index 7520163522027411b43c1ff9db893a05a8d9b3df..6f9f7c18c4dc71664c7fcddf3461f7cd235c768b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -93,11 +93,11 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
    * a live lock may happen if the compaction happens too frequently: one processing keeps deleting
    * old files while another one keeps retrying. Setting a reasonable cleanup delay could avoid it.
    */
-  private val fileCleanupDelayMs = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY)
+  private val fileCleanupDelayMs = sparkSession.sessionState.conf.fileSinkLogCleanupDelay
 
-  private val isDeletingExpiredLog = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_DELETION)
+  private val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSinkLogDeletion
 
-  private val compactInterval = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL)
+  private val compactInterval = sparkSession.sessionState.conf.fileSinkLogCompatInterval
   require(compactInterval > 0,
     s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " +
       "to a positive value.")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 5e1e5eeb50936f755dbdd61dcbbb170a7c2fe5d6..a1aae61107baf53a9571ea08066c7487086ef345 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -58,7 +58,7 @@ class StreamExecution(
 
   import org.apache.spark.sql.streaming.StreamingQueryListener._
 
-  private val pollingDelayMs = sparkSession.conf.get(SQLConf.STREAMING_POLLING_DELAY)
+  private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay
 
   /**
    * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
index e55f63a6c8db82a4fef3bb2a52f85a79db9a722f..de72f1cf2723de6c4f935eaa295243804873ea7d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
@@ -24,11 +24,9 @@ private[streaming] class StateStoreConf(@transient private val conf: SQLConf) ex
 
   def this() = this(new SQLConf)
 
-  import SQLConf._
+  val minDeltasForSnapshot = conf.stateStoreMinDeltasForSnapshot
 
-  val minDeltasForSnapshot = conf.getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
-
-  val minVersionsToRetain = conf.getConf(STATE_STORE_MIN_VERSIONS_TO_RETAIN)
+  val minVersionsToRetain = conf.stateStoreMinVersionsToRetain
 }
 
 private[streaming] object StateStoreConf {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1d6ca5a965cbfddbf00aa0d6454ea216e3af47ef..428032b1fba838413a79c770c1100b4a8db515fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -338,11 +338,6 @@ object SQLConf {
       .intConf
       .createWithDefault(4000)
 
-  val PARTITION_DISCOVERY_ENABLED = SQLConfigBuilder("spark.sql.sources.partitionDiscovery.enabled")
-    .doc("When true, automatically discover data partitions.")
-    .booleanConf
-    .createWithDefault(true)
-
   val PARTITION_COLUMN_TYPE_INFERENCE =
     SQLConfigBuilder("spark.sql.sources.partitionColumnTypeInference.enabled")
       .doc("When true, automatically infer the data types for partitioned columns.")
@@ -391,8 +386,10 @@ object SQLConf {
 
   val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
     SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.threshold")
-      .doc("The degree of parallelism for schema merging and partition discovery of " +
-        "Parquet data sources.")
+      .doc("The maximum number of files allowed for listing files at driver side. If the number " +
+        "of detected files exceeds this value during partition discovery, it tries to list the " +
+        "files with another Spark distributed job. This applies to Parquet, ORC, CSV, JSON and " +
+        "LibSVM data sources.")
       .intConf
       .createWithDefault(32)
 
@@ -592,8 +589,24 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD)
 
+  def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
+
+  def stateStoreMinVersionsToRetain: Int = getConf(STATE_STORE_MIN_VERSIONS_TO_RETAIN)
+
   def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION)
 
+  def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)
+
+  def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)
+
+  def fileSinkLogCompatInterval: Int = getConf(FILE_SINK_LOG_COMPACT_INTERVAL)
+
+  def fileSinkLogCleanupDelay: Long = getConf(FILE_SINK_LOG_CLEANUP_DELAY)
+
+  def streamingSchemaInference: Boolean = getConf(STREAMING_SCHEMA_INFERENCE)
+
+  def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY)
+
   def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
 
   def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES)
@@ -657,6 +670,12 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES, Long.MaxValue)
 
+  def isParquetSchemaMergingEnabled: Boolean = getConf(PARQUET_SCHEMA_MERGING_ENABLED)
+
+  def isParquetSchemaRespectSummaries: Boolean = getConf(PARQUET_SCHEMA_RESPECT_SUMMARIES)
+
+  def parquetOutputCommitterClass: String = getConf(PARQUET_OUTPUT_COMMITTER_CLASS)
+
   def isParquetBinaryAsString: Boolean = getConf(PARQUET_BINARY_AS_STRING)
 
   def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)
@@ -673,12 +692,11 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def convertCTAS: Boolean = getConf(CONVERT_CTAS)
 
-  def partitionDiscoveryEnabled(): Boolean =
-    getConf(SQLConf.PARTITION_DISCOVERY_ENABLED)
-
-  def partitionColumnTypeInferenceEnabled(): Boolean =
+  def partitionColumnTypeInferenceEnabled: Boolean =
     getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE)
 
+  def partitionMaxFiles: Int = getConf(PARTITION_MAX_FILES)
+
   def parallelPartitionDiscoveryThreshold: Int =
     getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)
 
@@ -695,6 +713,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS)
 
+  def dataFramePivotMaxValues: Int = getConf(DATAFRAME_PIVOT_MAX_VALUES)
+
   override def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES)
 
   def enableTwoLevelAggMap: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index bae7f56a23f81f34c470fad65011dded529b0ac7..bba7bc753eea90579ddbf39021add7c2919436ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -204,7 +204,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
       val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
         new Path(userSpecified).toUri.toString
       }.orElse {
-        df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION).map { location =>
+        df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
           new Path(location, name).toUri.toString
         }
       }.getOrElse {
@@ -232,7 +232,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
       val analyzedPlan = df.queryExecution.analyzed
       df.queryExecution.assertAnalyzed()
 
-      if (sparkSession.conf.get(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
+      if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
         UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
       }