diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index 64ebf0c9823464b5879b339f9284261cc4f66b63..7629369ab1379ec3b84f057836ddbafabd2b453f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
@@ -51,7 +52,7 @@ private[libsvm] class LibSVMOutputWriter(
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
         val configuration = context.getConfiguration
-        val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
+        val uniqueWriteJobId = configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
         val taskAttemptId = context.getTaskAttemptID
         val split = taskAttemptId.getTaskID.getId
         new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index deedb68a7817bd7a173027c43a2f49808562ae9e..4b9aab612e7c328570de176ea1983548443be6c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -256,15 +256,15 @@ case class CreateDataSourceTableAsSelectCommand(
 
 object CreateDataSourceTableUtils extends Logging {
 
-  // TODO: Actually replace usages with these variables (SPARK-15584)
-
   val DATASOURCE_PREFIX = "spark.sql.sources."
   val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider"
   val DATASOURCE_WRITEJOBUUID = DATASOURCE_PREFIX + "writeJobUUID"
   val DATASOURCE_OUTPUTPATH = DATASOURCE_PREFIX + "output.path"
-  val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_PREFIX + "schema."
+  val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema"
+  val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "."
   val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts"
   val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols"
+  val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols"
   val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets"
   val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + "numBucketCols"
   val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part."
@@ -296,7 +296,7 @@ object CreateDataSourceTableUtils extends Logging {
       options: Map[String, String],
       isExternal: Boolean): Unit = {
     val tableProperties = new mutable.HashMap[String, String]
-    tableProperties.put("spark.sql.sources.provider", provider)
+    tableProperties.put(DATASOURCE_PROVIDER, provider)
 
     // Saves optional user specified schema.  Serialized JSON schema string may be too long to be
     // stored into a single metastore SerDe property.  In this case, we split the JSON string and
@@ -306,34 +306,32 @@ object CreateDataSourceTableUtils extends Logging {
       val schemaJsonString = schema.json
       // Split the JSON string.
       val parts = schemaJsonString.grouped(threshold).toSeq
-      tableProperties.put("spark.sql.sources.schema.numParts", parts.size.toString)
+      tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
       parts.zipWithIndex.foreach { case (part, index) =>
-        tableProperties.put(s"spark.sql.sources.schema.part.$index", part)
+        tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
       }
     }
 
     if (userSpecifiedSchema.isDefined && partitionColumns.length > 0) {
-      tableProperties.put("spark.sql.sources.schema.numPartCols", partitionColumns.length.toString)
+      tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
       partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
-        tableProperties.put(s"spark.sql.sources.schema.partCol.$index", partCol)
+        tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
       }
     }
 
     if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) {
       val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get
 
-      tableProperties.put("spark.sql.sources.schema.numBuckets", numBuckets.toString)
-      tableProperties.put("spark.sql.sources.schema.numBucketCols",
-        bucketColumnNames.length.toString)
+      tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
+      tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString)
       bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
-        tableProperties.put(s"spark.sql.sources.schema.bucketCol.$index", bucketCol)
+        tableProperties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol)
       }
 
       if (sortColumnNames.nonEmpty) {
-        tableProperties.put("spark.sql.sources.schema.numSortCols",
-          sortColumnNames.length.toString)
+        tableProperties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, sortColumnNames.length.toString)
         sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
-          tableProperties.put(s"spark.sql.sources.schema.sortCol.$index", sortCol)
+          tableProperties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", sortCol)
         }
       }
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 15eba3b0110b6795c486766522ff38550a42778b..95bac949965775c8ba53b92aef257fd56dff9459 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils.DATASOURCE_PREFIX
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.execution.datasources.BucketSpec
 import org.apache.spark.sql.types._
 
@@ -464,7 +464,7 @@ case class AlterTableSetLocationCommand(
 object DDLUtils {
 
   def isDatasourceTable(props: Map[String, String]): Boolean = {
-    props.contains("spark.sql.sources.provider")
+    props.contains(DATASOURCE_PROVIDER)
   }
 
   def isDatasourceTable(table: CatalogTable): Boolean = {
@@ -503,8 +503,7 @@ object DDLUtils {
   }
 
   def isTablePartitioned(table: CatalogTable): Boolean = {
-    table.partitionColumns.nonEmpty ||
-      table.properties.contains("spark.sql.sources.schema.numPartCols")
+    table.partitionColumns.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS)
   }
 
   // A persisted data source table may not store its schema in the catalog. In this case, its schema
@@ -512,15 +511,15 @@ object DDLUtils {
   def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] = {
     require(isDatasourceTable(metadata))
     val props = metadata.properties
-    if (props.isDefinedAt("spark.sql.sources.schema")) {
+    if (props.isDefinedAt(DATASOURCE_SCHEMA)) {
       // Originally, we used spark.sql.sources.schema to store the schema of a data source table.
       // After SPARK-6024, we removed this flag.
       // Although we are not using spark.sql.sources.schema any more, we need to still support.
-      props.get("spark.sql.sources.schema").map(DataType.fromJson(_).asInstanceOf[StructType])
+      props.get(DATASOURCE_SCHEMA).map(DataType.fromJson(_).asInstanceOf[StructType])
     } else {
-      metadata.properties.get("spark.sql.sources.schema.numParts").map { numParts =>
+      metadata.properties.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
         val parts = (0 until numParts.toInt).map { index =>
-          val part = metadata.properties.get(s"spark.sql.sources.schema.part.$index").orNull
+          val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
           if (part == null) {
             throw new AnalysisException(
               "Could not read schema from the metastore because it is corrupted " +
@@ -543,7 +542,7 @@ object DDLUtils {
       numCols <- props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq
       index <- 0 until numCols.toInt
     } yield props.getOrElse(
-      s"spark.sql.sources.schema.${colType}Col.$index",
+      s"$DATASOURCE_SCHEMA_PREFIX${colType}Col.$index",
       throw new AnalysisException(
         s"Corrupted $typeName in catalog: $numCols parts expected, but part $index is missing."
       )
@@ -556,7 +555,7 @@ object DDLUtils {
 
   def getBucketSpecFromTableProperties(metadata: CatalogTable): Option[BucketSpec] = {
     if (isDatasourceTable(metadata)) {
-      metadata.properties.get("spark.sql.sources.schema.numBuckets").map { numBuckets =>
+      metadata.properties.get(DATASOURCE_SCHEMA_NUMBUCKETS).map { numBuckets =>
         BucketSpec(
           numBuckets.toInt,
           getColumnNamesByType(metadata.properties, "bucket", "bucketing columns"),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index d1024090d312d12c938118c74b449db2f5a96f19..2d6a3b48603d7ebc102745196187688f1ac474a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -443,7 +443,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
     table.properties.filterNot {
       // Hides schema properties that hold user-defined schema, partition columns, and bucketing
       // information since they are already extracted and shown in other parts.
-      case (key, _) => key.startsWith("spark.sql.sources.schema")
+      case (key, _) => key.startsWith(CreateDataSourceTableUtils.DATASOURCE_SCHEMA)
     }.foreach { case (key, value) =>
       append(buffer, s"  $key", value, "")
     }
@@ -860,7 +860,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
   private def showDataSourceTableOptions(metadata: CatalogTable, builder: StringBuilder): Unit = {
     val props = metadata.properties
 
-    builder ++= s"USING ${props("spark.sql.sources.provider")}\n"
+    builder ++= s"USING ${props(CreateDataSourceTableUtils.DATASOURCE_PROVIDER)}\n"
 
     val dataSourceOptions = metadata.storage.serdeProperties.filterNot {
       case (key, value) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index a3d87cd38bb8c4290da4c7606753df6fa96fc535..2b4786542c72f384bb04747969a599caa6cbc3fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -101,7 +101,7 @@ private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends Rule[
         userSpecifiedSchema = userSpecifiedSchema,
         partitionColumns = partitionColumns,
         bucketSpec = bucketSpec,
-        className = table.properties("spark.sql.sources.provider"),
+        className = table.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER),
         options = options)
 
     LogicalRelation(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 61dcbebd644f65129b769e2520282e5334fa04fb..f56b50a54385a4faaf6ee279a9b8c64b51d95928 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.UnsafeKVExternalSorter
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
 import org.apache.spark.util.{SerializableConfiguration, Utils}
@@ -91,7 +92,7 @@ private[sql] abstract class BaseWriterContainer(
     // This UUID is sent to executor side together with the serialized `Configuration` object within
     // the `Job` instance.  `OutputWriters` on the executor side should use this UUID to generate
     // unique task output files.
-    job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
+    job.getConfiguration.set(DATASOURCE_WRITEJOBUUID, uniqueWriteJobId.toString)
 
     // Order of the following two lines is important.  For Hadoop 1, TaskAttemptContext constructor
     // clones the Configuration object passed in.  If we initialize the TaskAttemptContext first,
@@ -241,7 +242,7 @@ private[sql] class DefaultWriterContainer(
   def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
     executorSideSetup(taskContext)
     val configuration = taskAttemptContext.getConfiguration
-    configuration.set("spark.sql.sources.output.path", outputPath)
+    configuration.set(DATASOURCE_OUTPUTPATH, outputPath)
     var writer = newOutputWriter(getWorkPath)
     writer.initConverter(dataSchema)
 
@@ -349,11 +350,10 @@ private[sql] class DynamicPartitionWriterContainer(
     val configuration = taskAttemptContext.getConfiguration
     val path = if (partitionColumns.nonEmpty) {
       val partitionPath = getPartitionString(key).getString(0)
-      configuration.set(
-        "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
+      configuration.set(DATASOURCE_OUTPUTPATH, new Path(outputPath, partitionPath).toString)
       new Path(getWorkPath, partitionPath).toString
     } else {
-      configuration.set("spark.sql.sources.output.path", outputPath)
+      configuration.set(DATASOURCE_OUTPUTPATH, outputPath)
       getWorkPath
     }
     val bucketId = getBucketIdFromKey(key)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index 9849484dcec859bb824f85eab8736fa28b720136..d72c8b9ac2e7cccb40d1e20cfb2611e904c70eee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
 import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile}
 import org.apache.spark.sql.types._
 
@@ -168,7 +169,7 @@ private[sql] class CsvOutputWriter(
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
         val configuration = context.getConfiguration
-        val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
+        val uniqueWriteJobId = configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
         val taskAttemptId = context.getTaskAttemptID
         val split = taskAttemptId.getTaskID.getId
         new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.csv$extension")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 35f247692ffdc6442e6b88886c2074e94f036dc0..c7c528119660cd791b5ebc92b3ced685da8c6a02 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -32,8 +32,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.JoinedRow
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
@@ -170,7 +169,7 @@ private[json] class JsonOutputWriter(
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
         val configuration = context.getConfiguration
-        val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
+        val uniqueWriteJobId = configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
         val taskAttemptId = context.getTaskAttemptID
         val split = taskAttemptId.getTaskID.getId
         val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index b47d41e1661f0f9bda0bdea5966a5c36eea96ec5..ff7962df22452b4c6b93e2bae0048046c62e7de4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -44,6 +44,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
@@ -521,7 +522,8 @@ private[sql] class ParquetOutputWriter(
         //     partitions in the case of dynamic partitioning.
         override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
           val configuration = context.getConfiguration
-          val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
+          val uniqueWriteJobId = configuration.get(
+            CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
           val taskAttemptId = context.getTaskAttemptID
           val split = taskAttemptId.getTaskID.getId
           val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index d9525efe6d9be4a35b4b7b829d9a612ce1a751f8..1e5bce4a75978de458ebeeb41f391d276809facc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{StringType, StructType}
@@ -119,7 +120,7 @@ class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemp
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
         val configuration = context.getConfiguration
-        val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
+        val uniqueWriteJobId = configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
         val taskAttemptId = context.getTaskAttemptID
         val split = taskAttemptId.getTaskID.getId
         new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.txt$extension")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index e975756685dec5fbef786d4f84a2a3b6c2112405..ccb40064839068c524c2776bbd39a8edabe7788d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFor
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils.DATASOURCE_PREFIX
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.execution.datasources.BucketSpec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -387,7 +387,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       val table = catalog.getTableMetadata(TableIdentifier("tbl"))
       assert(table.tableType == CatalogTableType.MANAGED)
       assert(table.schema == Seq(CatalogColumn("a", "int"), CatalogColumn("b", "int")))
-      assert(table.properties("spark.sql.sources.provider") == "parquet")
+      assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
     }
   }
 
@@ -398,7 +398,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       val table = catalog.getTableMetadata(TableIdentifier("tbl"))
       assert(table.tableType == CatalogTableType.MANAGED)
       assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible
-      assert(table.properties("spark.sql.sources.provider") == "parquet")
+      assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
       assert(DDLUtils.getSchemaFromTableProperties(table) ==
         Some(new StructType().add("a", IntegerType).add("b", IntegerType)))
       assert(DDLUtils.getPartitionColumnsFromTableProperties(table) ==
@@ -414,7 +414,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       val table = catalog.getTableMetadata(TableIdentifier("tbl"))
       assert(table.tableType == CatalogTableType.MANAGED)
       assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible
-      assert(table.properties("spark.sql.sources.provider") == "parquet")
+      assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
       assert(DDLUtils.getSchemaFromTableProperties(table) ==
         Some(new StructType().add("a", IntegerType).add("b", IntegerType)))
       assert(DDLUtils.getBucketSpecFromTableProperties(table) ==
@@ -747,7 +747,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       catalog: SessionCatalog,
       tableIdent: TableIdentifier): Unit = {
     catalog.alterTable(catalog.getTableMetadata(tableIdent).copy(
-      properties = Map("spark.sql.sources.provider" -> "csv")))
+      properties = Map(DATASOURCE_PROVIDER -> "csv")))
   }
 
   private def testSetProperties(isDatasourceTable: Boolean): Unit = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index ea721e4d9b6ece0a02d6754635aaa09ca8ab364c..ff395f39b705297790316a048b09fbad17d530fb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.execution.command.CreateTableAsSelectLogicalPlan
 import org.apache.spark.sql.execution.datasources.{Partition => _, _}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
@@ -74,9 +75,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
         // TODO: the following code is duplicated with FindDataSourceTable.readDataSourceTable
 
         def schemaStringFromParts: Option[String] = {
-          table.properties.get("spark.sql.sources.schema.numParts").map { numParts =>
+          table.properties.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
             val parts = (0 until numParts.toInt).map { index =>
-              val part = table.properties.get(s"spark.sql.sources.schema.part.$index").orNull
+              val part = table.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
               if (part == null) {
                 throw new AnalysisException(
                   "Could not read schema from the metastore because it is corrupted " +
@@ -91,9 +92,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
         }
 
         def getColumnNames(colType: String): Seq[String] = {
-          table.properties.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").map {
+          table.properties.get(s"$DATASOURCE_SCHEMA.num${colType.capitalize}Cols").map {
             numCols => (0 until numCols.toInt).map { index =>
-              table.properties.getOrElse(s"spark.sql.sources.schema.${colType}Col.$index",
+              table.properties.getOrElse(s"$DATASOURCE_SCHEMA_PREFIX${colType}Col.$index",
                 throw new AnalysisException(
                   s"Could not read $colType columns from the metastore because it is corrupted " +
                     s"(missing part $index of it, $numCols parts are expected)."))
@@ -104,8 +105,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
         // Originally, we used spark.sql.sources.schema to store the schema of a data source table.
         // After SPARK-6024, we removed this flag.
         // Although we are not using spark.sql.sources.schema any more, we need to still support.
-        val schemaString =
-          table.properties.get("spark.sql.sources.schema").orElse(schemaStringFromParts)
+        val schemaString = table.properties.get(DATASOURCE_SCHEMA).orElse(schemaStringFromParts)
 
         val userSpecifiedSchema =
           schemaString.map(s => DataType.fromJson(s).asInstanceOf[StructType])
@@ -115,7 +115,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
         // from userSpecifiedSchema.
         val partitionColumns = getColumnNames("part")
 
-        val bucketSpec = table.properties.get("spark.sql.sources.schema.numBuckets").map { n =>
+        val bucketSpec = table.properties.get(DATASOURCE_SCHEMA_NUMBUCKETS).map { n =>
           BucketSpec(n.toInt, getColumnNames("bucket"), getColumnNames("sort"))
         }
 
@@ -126,7 +126,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
             userSpecifiedSchema = userSpecifiedSchema,
             partitionColumns = partitionColumns,
             bucketSpec = bucketSpec,
-            className = table.properties("spark.sql.sources.provider"),
+            className = table.properties(DATASOURCE_PROVIDER),
             options = options)
 
         LogicalRelation(
@@ -166,7 +166,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
     val qualifiedTableName = getQualifiedTableName(tableIdent)
     val table = client.getTable(qualifiedTableName.database, qualifiedTableName.name)
 
-    if (table.properties.get("spark.sql.sources.provider").isDefined) {
+    if (table.properties.get(DATASOURCE_PROVIDER).isDefined) {
       val dataSourceTable = cachedDataSourceTables(qualifiedTableName)
       val qualifiedTable = SubqueryAlias(qualifiedTableName.name, dataSourceTable)
       // Then, if alias is specified, wrap the table with a Subquery using the alias.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index f1198179a0994fc684e4c21ee3e5bc32f9c49e75..0e8c37df88ee5e77f4b8796890a13768ac1b7080 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -36,6 +36,7 @@ import org.apache.spark.rdd.{HadoopRDD, RDD}
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hive.{HiveInspectors, HiveShim}
 import org.apache.spark.sql.sources.{Filter, _}
@@ -217,7 +218,7 @@ private[orc] class OrcOutputWriter(
 
   private lazy val recordWriter: RecordWriter[NullWritable, Writable] = {
     recordWriterInstantiated = true
-    val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
+    val uniqueWriteJobId = conf.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
     val taskAttemptId = context.getTaskAttemptID
     val partition = taskAttemptId.getTaskID.getId
     val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 1e6de463b3eba14064c6564e38711eb3ab3bfa92..2c50cc88cc4ccb902c6fab982a5900fcda5c621a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
@@ -700,7 +700,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
           val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true)))
 
           // Manually create a metastore data source table.
-          CreateDataSourceTableUtils.createDataSourceTable(
+          createDataSourceTable(
             sparkSession = spark,
             tableIdent = TableIdentifier("wide_schema"),
             userSpecifiedSchema = Some(schema),
@@ -737,8 +737,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
             "path" -> sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier(tableName)))
         ),
         properties = Map(
-          "spark.sql.sources.provider" -> "json",
-          "spark.sql.sources.schema" -> schema.json,
+          DATASOURCE_PROVIDER -> "json",
+          DATASOURCE_SCHEMA -> schema.json,
           "EXTERNAL" -> "FALSE"))
 
       sharedState.externalCatalog.createTable("default", hiveTable, ignoreIfExists = false)
@@ -762,13 +762,13 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       val metastoreTable = sharedState.externalCatalog.getTable("default", tableName)
       val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
 
-      val numPartCols = metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt
+      val numPartCols = metastoreTable.properties(DATASOURCE_SCHEMA_NUMPARTCOLS).toInt
       assert(numPartCols == 2)
 
       val actualPartitionColumns =
         StructType(
           (0 until numPartCols).map { index =>
-            df.schema(metastoreTable.properties(s"spark.sql.sources.schema.partCol.$index"))
+            df.schema(metastoreTable.properties(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index"))
           })
       // Make sure partition columns are correctly stored in metastore.
       assert(
@@ -798,19 +798,19 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
       val expectedSortByColumns = StructType(df.schema("c") :: Nil)
 
-      val numBuckets = metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt
+      val numBuckets = metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETS).toInt
       assert(numBuckets == 8)
 
-      val numBucketCols = metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt
+      val numBucketCols = metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETCOLS).toInt
       assert(numBucketCols == 2)
 
-      val numSortCols = metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt
+      val numSortCols = metastoreTable.properties(DATASOURCE_SCHEMA_NUMSORTCOLS).toInt
       assert(numSortCols == 1)
 
       val actualBucketByColumns =
         StructType(
           (0 until numBucketCols).map { index =>
-            df.schema(metastoreTable.properties(s"spark.sql.sources.schema.bucketCol.$index"))
+            df.schema(metastoreTable.properties(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index"))
           })
       // Make sure bucketBy columns are correctly stored in metastore.
       assert(
@@ -821,7 +821,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       val actualSortByColumns =
         StructType(
           (0 until numSortCols).map { index =>
-            df.schema(metastoreTable.properties(s"spark.sql.sources.schema.sortCol.$index"))
+            df.schema(metastoreTable.properties(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index"))
           })
       // Make sure sortBy columns are correctly stored in metastore.
       assert(
@@ -913,7 +913,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
     withTempDir { tempPath =>
       val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType)))
 
-      CreateDataSourceTableUtils.createDataSourceTable(
+      createDataSourceTable(
         sparkSession = spark,
         tableIdent = TableIdentifier("not_skip_hive_metadata"),
         userSpecifiedSchema = Some(schema),
@@ -928,7 +928,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       assert(sharedState.externalCatalog.getTable("default", "not_skip_hive_metadata").schema
         .forall(column => CatalystSqlParser.parseDataType(column.dataType) == StringType))
 
-      CreateDataSourceTableUtils.createDataSourceTable(
+      createDataSourceTable(
         sparkSession = spark,
         tableIdent = TableIdentifier("skip_hive_metadata"),
         userSpecifiedSchema = Some(schema),
@@ -960,10 +960,10 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         )
 
         val metastoreTable = sharedState.externalCatalog.getTable("default", "t")
-        assert(metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt === 1)
-        assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numBuckets"))
-        assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numBucketCols"))
-        assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numSortCols"))
+        assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMPARTCOLS).toInt === 1)
+        assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMBUCKETS))
+        assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMBUCKETCOLS))
+        assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMSORTCOLS))
 
         checkAnswer(table("t"), Row(2, 1))
       }
@@ -984,10 +984,10 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         )
 
         val metastoreTable = sharedState.externalCatalog.getTable("default", "t")
-        assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numPartCols"))
-        assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt === 2)
-        assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt === 1)
-        assert(metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt === 1)
+        assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS))
+        assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETS).toInt === 2)
+        assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETCOLS).toInt === 1)
+        assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMSORTCOLS).toInt === 1)
 
         checkAnswer(table("t"), Row(1, 2))
       }
@@ -1006,10 +1006,10 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         )
 
         val metastoreTable = sharedState.externalCatalog.getTable("default", "t")
-        assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numPartCols"))
-        assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt === 2)
-        assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt === 1)
-        assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numSortCols"))
+        assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS))
+        assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETS).toInt === 2)
+        assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETCOLS).toInt === 1)
+        assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMSORTCOLS))
 
         checkAnswer(table("t"), Row(1, 2))
       }
@@ -1031,10 +1031,10 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         )
 
         val metastoreTable = sharedState.externalCatalog.getTable("default", "t")
-        assert(metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt === 1)
-        assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt === 2)
-        assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt === 1)
-        assert(metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt === 1)
+        assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMPARTCOLS).toInt === 1)
+        assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETS).toInt === 2)
+        assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETCOLS).toInt === 1)
+        assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMSORTCOLS).toInt === 1)
 
         checkAnswer(table("t"), Row(2, 3, 1))
       }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
index 6f374d713bfc849733c3284b036a4b097c321b92..741abcb7513cbc54b5fb13ae04b2fc15936c4cc3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
 
@@ -101,24 +101,22 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
 
   test("show tblproperties of data source tables - basic") {
     checkAnswer(
-      sql("SHOW TBLPROPERTIES parquet_tab1")
-        .filter(s"key = 'spark.sql.sources.provider'"),
-      Row("spark.sql.sources.provider", "org.apache.spark.sql.parquet.DefaultSource") :: Nil
+      sql("SHOW TBLPROPERTIES parquet_tab1").filter(s"key = '$DATASOURCE_PROVIDER'"),
+      Row(DATASOURCE_PROVIDER, "org.apache.spark.sql.parquet.DefaultSource") :: Nil
     )
 
     checkAnswer(
-      sql("SHOW TBLPROPERTIES parquet_tab1(spark.sql.sources.provider)"),
+      sql(s"SHOW TBLPROPERTIES parquet_tab1($DATASOURCE_PROVIDER)"),
       Row("org.apache.spark.sql.parquet.DefaultSource") :: Nil
     )
 
     checkAnswer(
-      sql("SHOW TBLPROPERTIES parquet_tab1")
-        .filter(s"key = 'spark.sql.sources.schema.numParts'"),
-      Row("spark.sql.sources.schema.numParts", "1") :: Nil
+      sql("SHOW TBLPROPERTIES parquet_tab1").filter(s"key = '$DATASOURCE_SCHEMA_NUMPARTS'"),
+      Row(DATASOURCE_SCHEMA_NUMPARTS, "1") :: Nil
     )
 
     checkAnswer(
-      sql("SHOW TBLPROPERTIES parquet_tab1('spark.sql.sources.schema.numParts')"),
+      sql(s"SHOW TBLPROPERTIES parquet_tab1('$DATASOURCE_SCHEMA_NUMPARTS')"),
       Row("1"))
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 0fa18414154ce32c0b07fda18f91ebbac55ec0bb..1fb777ade4b22a6a4fdeeaebbcffb988047af2b9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.{sources, Row, SparkSession}
 import org.apache.spark.sql.catalyst.{expressions, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, GenericInternalRow, InterpretedPredicate, InterpretedProjection, JoinedRow, Literal}
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.util.SerializableConfiguration
@@ -144,7 +145,7 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW
 
   override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
     val configuration = context.getConfiguration
-    val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
+    val uniqueWriteJobId = configuration.get(DATASOURCE_WRITEJOBUUID)
     val taskAttemptId = context.getTaskAttemptID
     val split = taskAttemptId.getTaskID.getId
     val name = FileOutputFormat.getOutputName(context)