diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index d7fe6b32822a7b46b5c7385fe629b21da459e480..ee48baa59c7af63450850e0a9b707b7404f0fd09 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -2659,7 +2659,7 @@ test_that("Call DataFrameWriter.save() API in Java without path and check argume
   # It makes sure that we can omit path argument in write.df API and then it calls
   # DataFrameWriter.save() without path.
   expect_error(write.df(df, source = "csv"),
-               "Error in save : illegal argument - 'path' is not specified")
+              "Error in save : illegal argument - Expected exactly one path to be specified")
   expect_error(write.json(df, jsonPath),
               "Error in json : analysis error - path file:.*already exists")
   expect_error(write.text(df, jsonPath),
@@ -2667,7 +2667,7 @@ test_that("Call DataFrameWriter.save() API in Java without path and check argume
   expect_error(write.orc(df, jsonPath),
               "Error in orc : analysis error - path file:.*already exists")
   expect_error(write.parquet(df, jsonPath),
-                            "Error in parquet : analysis error - path file:.*already exists")
+              "Error in parquet : analysis error - path file:.*already exists")
 
   # Arguments checking in R side.
   expect_error(write.df(df, "data.tmp", source = c(1, 2)),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index f95c9f8cfa2d49c2c72319169cdee508d8bdcd58..ea675b76607d69b1283715d1e2ca220a9d809c6d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -196,18 +196,32 @@ class InMemoryCatalog(
         throw new TableAlreadyExistsException(db = db, table = table)
       }
     } else {
-      if (tableDefinition.tableType == CatalogTableType.MANAGED) {
-        val dir = new Path(catalog(db).db.locationUri, table)
+      // Set the default table location if this is a managed table and its location is not
+      // specified.
+      // Ideally we should not create a managed table with location, but Hive serde table can
+      // specify location for managed table. And in [[CreateDataSourceTableAsSelectCommand]] we have
+      // to create the table directory and write out data before we create this table, to avoid
+      // exposing a partial written table.
+      val needDefaultTableLocation =
+        tableDefinition.tableType == CatalogTableType.MANAGED &&
+          tableDefinition.storage.locationUri.isEmpty
+
+      val tableWithLocation = if (needDefaultTableLocation) {
+        val defaultTableLocation = new Path(catalog(db).db.locationUri, table)
         try {
-          val fs = dir.getFileSystem(hadoopConfig)
-          fs.mkdirs(dir)
+          val fs = defaultTableLocation.getFileSystem(hadoopConfig)
+          fs.mkdirs(defaultTableLocation)
         } catch {
           case e: IOException =>
             throw new SparkException(s"Unable to create table $table as failed " +
-              s"to create its directory $dir", e)
+              s"to create its directory $defaultTableLocation", e)
         }
+        tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri.toString))
+      } else {
+        tableDefinition
       }
-      catalog(db).tables.put(table, new TableDesc(tableDefinition))
+
+      catalog(db).tables.put(table, new TableDesc(tableWithLocation))
     }
   }
 
@@ -218,8 +232,12 @@ class InMemoryCatalog(
       purge: Boolean): Unit = synchronized {
     requireDbExists(db)
     if (tableExists(db, table)) {
-      if (getTable(db, table).tableType == CatalogTableType.MANAGED) {
-        val dir = new Path(catalog(db).db.locationUri, table)
+      val tableMeta = getTable(db, table)
+      if (tableMeta.tableType == CatalogTableType.MANAGED) {
+        assert(tableMeta.storage.locationUri.isDefined,
+          "Managed table should always have table location, as we will assign a default location " +
+            "to it if it doesn't have one.")
+        val dir = new Path(tableMeta.storage.locationUri.get)
         try {
           val fs = dir.getFileSystem(hadoopConfig)
           fs.delete(dir, true)
@@ -244,7 +262,10 @@ class InMemoryCatalog(
     oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, Some(db)))
 
     if (oldDesc.table.tableType == CatalogTableType.MANAGED) {
-      val oldDir = new Path(catalog(db).db.locationUri, oldName)
+      assert(oldDesc.table.storage.locationUri.isDefined,
+        "Managed table should always have table location, as we will assign a default location " +
+          "to it if it doesn't have one.")
+      val oldDir = new Path(oldDesc.table.storage.locationUri.get)
       val newDir = new Path(catalog(db).db.locationUri, newName)
       try {
         val fs = oldDir.getFileSystem(hadoopConfig)
@@ -254,6 +275,7 @@ class InMemoryCatalog(
           throw new SparkException(s"Unable to rename table $oldName to $newName as failed " +
             s"to rename its directory $oldDir", e)
       }
+      oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri.toString))
     }
 
     catalog(db).tables.put(newName, oldDesc)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 700f4835ac89aaa54d7ba5aaa5da70bdec9060c2..f95362e29228055e9dd2228c859573649e8dbd6b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -373,7 +373,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         throw new AnalysisException(s"Table $tableIdent already exists.")
 
       case _ =>
-        val tableType = if (new CaseInsensitiveMap(extraOptions.toMap).contains("path")) {
+        val storage = DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
+        val tableType = if (storage.locationUri.isDefined) {
           CatalogTableType.EXTERNAL
         } else {
           CatalogTableType.MANAGED
@@ -382,7 +383,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         val tableDesc = CatalogTable(
           identifier = tableIdent,
           tableType = tableType,
-          storage = CatalogStorageFormat.empty.copy(properties = extraOptions.toMap),
+          storage = storage,
           schema = new StructType,
           provider = Some(source),
           partitionColumnNames = partitioningColumns.getOrElse(Nil),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index fe183d0097d03a3f245f2ea4ebbdd8e06c167cff..634ffde3543cb6bfed3cd484f462312f208cab5b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -343,7 +343,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
 
     // TODO: this may be wrong for non file-based data source like JDBC, which should be external
     // even there is no `path` in options. We should consider allow the EXTERNAL keyword.
-    val tableType = if (new CaseInsensitiveMap(options).contains("path")) {
+    val storage = DataSource.buildStorageFormatFromOptions(options)
+    val tableType = if (storage.locationUri.isDefined) {
       CatalogTableType.EXTERNAL
     } else {
       CatalogTableType.MANAGED
@@ -352,7 +353,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     val tableDesc = CatalogTable(
       identifier = table,
       tableType = tableType,
-      storage = CatalogStorageFormat.empty.copy(properties = options),
+      storage = storage,
       schema = schema.getOrElse(new StructType),
       provider = Some(provider),
       partitionColumnNames = partitionColumnNames,
@@ -1062,17 +1063,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
         if (conf.convertCTAS && !hasStorageProperties) {
           // At here, both rowStorage.serdeProperties and fileStorage.serdeProperties
           // are empty Maps.
-          val optionsWithPath = if (location.isDefined) {
-            Map("path" -> location.get)
-          } else {
-            Map.empty[String, String]
-          }
-
           val newTableDesc = tableDesc.copy(
-            storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath),
-            provider = Some(conf.defaultDataSourceName)
-          )
-
+            storage = CatalogStorageFormat.empty.copy(locationUri = location),
+            provider = Some(conf.defaultDataSourceName))
           CreateTable(newTableDesc, mode, Some(q))
         } else {
           CreateTable(tableDesc, mode, Some(q))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 2a9743130d4c4cb70cfe746b94d0da0aef684000..d4b28274cc4539325299d892a6d20b47aeca13d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -57,13 +57,14 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
 
     // Create the relation to validate the arguments before writing the metadata to the metastore,
     // and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
+    val pathOption = table.storage.locationUri.map("path" -> _)
     val dataSource: BaseRelation =
       DataSource(
         sparkSession = sparkSession,
         userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
         className = table.provider.get,
         bucketSpec = table.bucketSpec,
-        options = table.storage.properties).resolveRelation()
+        options = table.storage.properties ++ pathOption).resolveRelation()
 
     dataSource match {
       case fs: HadoopFsRelation =>
@@ -85,14 +86,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
       }
     }
 
-    val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) {
-      table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier))
-    } else {
-      table.storage.properties
-    }
-
     val newTable = table.copy(
-      storage = table.storage.copy(properties = optionsWithPath),
       schema = dataSource.schema,
       partitionColumnNames = partitionColumnNames,
       // If metastore partition management for file source tables is enabled, we start off with
@@ -140,12 +134,6 @@ case class CreateDataSourceTableAsSelectCommand(
     val tableIdentWithDB = table.identifier.copy(database = Some(db))
     val tableName = tableIdentWithDB.unquotedString
 
-    val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) {
-      table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier))
-    } else {
-      table.storage.properties
-    }
-
     var createMetastoreTable = false
     var existingSchema = Option.empty[StructType]
     if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
@@ -162,13 +150,7 @@ case class CreateDataSourceTableAsSelectCommand(
           return Seq.empty[Row]
         case SaveMode.Append =>
           // Check if the specified data source match the data source of the existing table.
-          val dataSource = DataSource(
-            sparkSession = sparkSession,
-            userSpecifiedSchema = Some(query.schema.asNullable),
-            partitionColumns = table.partitionColumnNames,
-            bucketSpec = table.bucketSpec,
-            className = provider,
-            options = optionsWithPath)
+          val existingProvider = DataSource.lookupDataSource(provider)
           // TODO: Check that options from the resolved relation match the relation that we are
           // inserting into (i.e. using the same compression).
 
@@ -178,7 +160,7 @@ case class CreateDataSourceTableAsSelectCommand(
             case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
               // check if the file formats match
               l.relation match {
-                case r: HadoopFsRelation if r.fileFormat.getClass != dataSource.providingClass =>
+                case r: HadoopFsRelation if r.fileFormat.getClass != existingProvider =>
                   throw new AnalysisException(
                     s"The file format of the existing table $tableName is " +
                       s"`${r.fileFormat.getClass.getName}`. It doesn't match the specified " +
@@ -213,13 +195,20 @@ case class CreateDataSourceTableAsSelectCommand(
       case None => data
     }
 
+    val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
+      Some(sessionState.catalog.defaultTablePath(table.identifier))
+    } else {
+      table.storage.locationUri
+    }
+
     // Create the relation based on the data of df.
+    val pathOption = tableLocation.map("path" -> _)
     val dataSource = DataSource(
       sparkSession,
       className = provider,
       partitionColumns = table.partitionColumnNames,
       bucketSpec = table.bucketSpec,
-      options = optionsWithPath)
+      options = table.storage.properties ++ pathOption)
 
     val result = try {
       dataSource.write(mode, df)
@@ -230,7 +219,7 @@ case class CreateDataSourceTableAsSelectCommand(
     }
     if (createMetastoreTable) {
       val newTable = table.copy(
-        storage = table.storage.copy(properties = optionsWithPath),
+        storage = table.storage.copy(locationUri = tableLocation),
         // We will use the schema of resolved.relation as the schema of the table (instead of
         // the schema of df). It is important since the nullability may be changed by the relation
         // provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 61e0550cef5e3074aaa39d6b62eff56f30b233b3..52af915b0be654a9871c33a8c33ac6e98d1a98be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -485,14 +485,6 @@ case class AlterTableRecoverPartitionsCommand(
     }
   }
 
-  private def getBasePath(table: CatalogTable): Option[String] = {
-    if (table.provider == Some("hive")) {
-      table.storage.locationUri
-    } else {
-      new CaseInsensitiveMap(table.storage.properties).get("path")
-    }
-  }
-
   override def run(spark: SparkSession): Seq[Row] = {
     val catalog = spark.sessionState.catalog
     val table = catalog.getTableMetadata(tableName)
@@ -503,13 +495,12 @@ case class AlterTableRecoverPartitionsCommand(
         s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB")
     }
 
-    val tablePath = getBasePath(table)
-    if (tablePath.isEmpty) {
+    if (table.storage.locationUri.isEmpty) {
       throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " +
         s"location provided: $tableIdentWithDB")
     }
 
-    val root = new Path(tablePath.get)
+    val root = new Path(table.storage.locationUri.get)
     logInfo(s"Recover all the partitions in $root")
     val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
 
@@ -688,15 +679,7 @@ case class AlterTableSetLocationCommand(
         catalog.alterPartitions(table.identifier, Seq(newPart))
       case None =>
         // No partition spec is specified, so we set the location for the table itself
-        val newTable =
-          if (DDLUtils.isDatasourceTable(table)) {
-            table.withNewStorage(
-              locationUri = Some(location),
-              properties = table.storage.properties ++ Map("path" -> location))
-          } else {
-            table.withNewStorage(locationUri = Some(location))
-          }
-        catalog.alterTable(newTable)
+        catalog.alterTable(table.withNewStorage(locationUri = Some(location)))
     }
     Seq.empty[Row]
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 4acfffb628047134dbb05c77b25957df1f1ccd20..f32c956f5999e9813472f3c2719bf19e7e472520 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
 import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -62,25 +63,6 @@ case class CreateTableLikeCommand(
     val catalog = sparkSession.sessionState.catalog
     val sourceTableDesc = catalog.getTempViewOrPermanentTableMetadata(sourceTable)
 
-    // Storage format
-    val newStorage =
-      if (sourceTableDesc.tableType == CatalogTableType.VIEW) {
-        val newPath = catalog.defaultTablePath(targetTable)
-        CatalogStorageFormat.empty.copy(properties = Map("path" -> newPath))
-      } else if (DDLUtils.isDatasourceTable(sourceTableDesc)) {
-        val newPath = catalog.defaultTablePath(targetTable)
-        val newSerdeProp =
-          sourceTableDesc.storage.properties.filterKeys(_.toLowerCase != "path") ++
-            Map("path" -> newPath)
-        sourceTableDesc.storage.copy(
-          locationUri = None,
-          properties = newSerdeProp)
-      } else {
-        sourceTableDesc.storage.copy(
-          locationUri = None,
-          properties = sourceTableDesc.storage.properties)
-      }
-
     val newProvider = if (sourceTableDesc.tableType == CatalogTableType.VIEW) {
       Some(sparkSession.sessionState.conf.defaultDataSourceName)
     } else {
@@ -91,7 +73,8 @@ case class CreateTableLikeCommand(
       CatalogTable(
         identifier = targetTable,
         tableType = CatalogTableType.MANAGED,
-        storage = newStorage,
+        // We are creating a new managed table, which should not have custom table location.
+        storage = sourceTableDesc.storage.copy(locationUri = None),
         schema = sourceTableDesc.schema,
         provider = newProvider,
         partitionColumnNames = sourceTableDesc.partitionColumnNames,
@@ -170,13 +153,6 @@ case class AlterTableRenameCommand(
           case NonFatal(e) => log.warn(e.toString, e)
         }
       }
-      // For datasource tables, we also need to update the "path" serde property
-      if (DDLUtils.isDatasourceTable(table) && table.tableType == CatalogTableType.MANAGED) {
-        val newPath = catalog.defaultTablePath(newName)
-        val newTable = table.withNewStorage(
-          properties = table.storage.properties ++ Map("path" -> newPath))
-        catalog.alterTable(newTable)
-      }
       // Invalidate the table last, otherwise uncaching the table would load the logical plan
       // back into the hive metastore cache
       catalog.refreshTable(oldName)
@@ -367,8 +343,9 @@ case class TruncateTableCommand(
       DDLUtils.verifyPartitionProviderIsHive(spark, table, "TRUNCATE TABLE ... PARTITION")
     }
     val locations =
-      if (DDLUtils.isDatasourceTable(table)) {
-        Seq(table.storage.properties.get("path"))
+      // TODO: The `InMemoryCatalog` doesn't support listPartition with partial partition spec.
+      if (spark.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") {
+        Seq(table.storage.locationUri)
       } else if (table.partitionColumnNames.isEmpty) {
         Seq(table.storage.locationUri)
       } else {
@@ -916,17 +893,18 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
   }
 
   private def showDataSourceTableOptions(metadata: CatalogTable, builder: StringBuilder): Unit = {
-    val props = metadata.properties
-
     builder ++= s"USING ${metadata.provider.get}\n"
 
-    val dataSourceOptions = metadata.storage.properties.filterNot {
-      case (key, value) =>
+    val dataSourceOptions = metadata.storage.properties.map {
+      case (key, value) => s"${quoteIdentifier(key)} '${escapeSingleQuotedString(value)}'"
+    } ++ metadata.storage.locationUri.flatMap { location =>
+      if (metadata.tableType == MANAGED) {
         // If it's a managed table, omit PATH option. Spark SQL always creates external table
         // when the table creation DDL contains the PATH option.
-        key.toLowerCase == "path" && metadata.tableType == MANAGED
-    }.map {
-      case (key, value) => s"${quoteIdentifier(key)} '${escapeSingleQuotedString(value)}'"
+        None
+      } else {
+        Some(s"path '${escapeSingleQuotedString(location)}'")
+      }
     }
 
     if (dataSourceOptions.nonEmpty) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 3f956c427655ec6b5b87ba115d7bb4576cc06ebb..0b50448a7af1834384577fc609adced15e11bebb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
@@ -78,115 +78,9 @@ case class DataSource(
 
   case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String])
 
-  lazy val providingClass: Class[_] = lookupDataSource(className)
+  lazy val providingClass: Class[_] = DataSource.lookupDataSource(className)
   lazy val sourceInfo = sourceSchema()
 
-  /** A map to maintain backward compatibility in case we move data sources around. */
-  private val backwardCompatibilityMap: Map[String, String] = {
-    val jdbc = classOf[JdbcRelationProvider].getCanonicalName
-    val json = classOf[JsonFileFormat].getCanonicalName
-    val parquet = classOf[ParquetFileFormat].getCanonicalName
-    val csv = classOf[CSVFileFormat].getCanonicalName
-    val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
-    val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
-
-    Map(
-      "org.apache.spark.sql.jdbc" -> jdbc,
-      "org.apache.spark.sql.jdbc.DefaultSource" -> jdbc,
-      "org.apache.spark.sql.execution.datasources.jdbc.DefaultSource" -> jdbc,
-      "org.apache.spark.sql.execution.datasources.jdbc" -> jdbc,
-      "org.apache.spark.sql.json" -> json,
-      "org.apache.spark.sql.json.DefaultSource" -> json,
-      "org.apache.spark.sql.execution.datasources.json" -> json,
-      "org.apache.spark.sql.execution.datasources.json.DefaultSource" -> json,
-      "org.apache.spark.sql.parquet" -> parquet,
-      "org.apache.spark.sql.parquet.DefaultSource" -> parquet,
-      "org.apache.spark.sql.execution.datasources.parquet" -> parquet,
-      "org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet,
-      "org.apache.spark.sql.hive.orc.DefaultSource" -> orc,
-      "org.apache.spark.sql.hive.orc" -> orc,
-      "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
-      "org.apache.spark.ml.source.libsvm" -> libsvm,
-      "com.databricks.spark.csv" -> csv
-    )
-  }
-
-  /**
-   * Class that were removed in Spark 2.0. Used to detect incompatibility libraries for Spark 2.0.
-   */
-  private val spark2RemovedClasses = Set(
-    "org.apache.spark.sql.DataFrame",
-    "org.apache.spark.sql.sources.HadoopFsRelationProvider",
-    "org.apache.spark.Logging")
-
-  /** Given a provider name, look up the data source class definition. */
-  private def lookupDataSource(provider0: String): Class[_] = {
-    val provider = backwardCompatibilityMap.getOrElse(provider0, provider0)
-    val provider2 = s"$provider.DefaultSource"
-    val loader = Utils.getContextOrSparkClassLoader
-    val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
-
-    try {
-      serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider)).toList match {
-        // the provider format did not match any given registered aliases
-        case Nil =>
-          try {
-            Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match {
-              case Success(dataSource) =>
-                // Found the data source using fully qualified path
-                dataSource
-              case Failure(error) =>
-                if (provider.toLowerCase == "orc" ||
-                  provider.startsWith("org.apache.spark.sql.hive.orc")) {
-                  throw new AnalysisException(
-                    "The ORC data source must be used with Hive support enabled")
-                } else if (provider.toLowerCase == "avro" ||
-                  provider == "com.databricks.spark.avro") {
-                  throw new AnalysisException(
-                    s"Failed to find data source: ${provider.toLowerCase}. Please find an Avro " +
-                      "package at " +
-                      "https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects")
-                } else {
-                  throw new ClassNotFoundException(
-                    s"Failed to find data source: $provider. Please find packages at " +
-                      "https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects",
-                    error)
-                }
-            }
-          } catch {
-            case e: NoClassDefFoundError => // This one won't be caught by Scala NonFatal
-              // NoClassDefFoundError's class name uses "/" rather than "." for packages
-              val className = e.getMessage.replaceAll("/", ".")
-              if (spark2RemovedClasses.contains(className)) {
-                throw new ClassNotFoundException(s"$className was removed in Spark 2.0. " +
-                  "Please check if your library is compatible with Spark 2.0", e)
-              } else {
-                throw e
-              }
-          }
-        case head :: Nil =>
-          // there is exactly one registered alias
-          head.getClass
-        case sources =>
-          // There are multiple registered aliases for the input
-          sys.error(s"Multiple sources found for $provider " +
-            s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
-            "please specify the fully qualified class name.")
-      }
-    } catch {
-      case e: ServiceConfigurationError if e.getCause.isInstanceOf[NoClassDefFoundError] =>
-        // NoClassDefFoundError's class name uses "/" rather than "." for packages
-        val className = e.getCause.getMessage.replaceAll("/", ".")
-        if (spark2RemovedClasses.contains(className)) {
-          throw new ClassNotFoundException(s"Detected an incompatible DataSourceRegister. " +
-            "Please remove the incompatible library from classpath or upgrade it. " +
-            s"Error: ${e.getMessage}", e)
-        } else {
-          throw e
-        }
-    }
-  }
-
   /**
    * Infer the schema of the given FileFormat, returns a pair of schema and partition column names.
    */
@@ -470,13 +364,14 @@ case class DataSource(
         //  1. Only one output path can be specified on the write path;
         //  2. Output path must be a legal HDFS style file system path;
         //  3. It's OK that the output path doesn't exist yet;
-        val caseInsensitiveOptions = new CaseInsensitiveMap(options)
-        val outputPath = {
-          val path = new Path(caseInsensitiveOptions.getOrElse("path", {
-            throw new IllegalArgumentException("'path' is not specified")
-          }))
+        val allPaths = paths ++ new CaseInsensitiveMap(options).get("path")
+        val outputPath = if (allPaths.length == 1) {
+          val path = new Path(allPaths.head)
           val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
           path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+        } else {
+          throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
+            s"got: ${allPaths.mkString(", ")}")
         }
 
         val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
@@ -539,3 +434,123 @@ case class DataSource(
     }
   }
 }
+
+object DataSource {
+
+  /** A map to maintain backward compatibility in case we move data sources around. */
+  private val backwardCompatibilityMap: Map[String, String] = {
+    val jdbc = classOf[JdbcRelationProvider].getCanonicalName
+    val json = classOf[JsonFileFormat].getCanonicalName
+    val parquet = classOf[ParquetFileFormat].getCanonicalName
+    val csv = classOf[CSVFileFormat].getCanonicalName
+    val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
+    val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
+
+    Map(
+      "org.apache.spark.sql.jdbc" -> jdbc,
+      "org.apache.spark.sql.jdbc.DefaultSource" -> jdbc,
+      "org.apache.spark.sql.execution.datasources.jdbc.DefaultSource" -> jdbc,
+      "org.apache.spark.sql.execution.datasources.jdbc" -> jdbc,
+      "org.apache.spark.sql.json" -> json,
+      "org.apache.spark.sql.json.DefaultSource" -> json,
+      "org.apache.spark.sql.execution.datasources.json" -> json,
+      "org.apache.spark.sql.execution.datasources.json.DefaultSource" -> json,
+      "org.apache.spark.sql.parquet" -> parquet,
+      "org.apache.spark.sql.parquet.DefaultSource" -> parquet,
+      "org.apache.spark.sql.execution.datasources.parquet" -> parquet,
+      "org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet,
+      "org.apache.spark.sql.hive.orc.DefaultSource" -> orc,
+      "org.apache.spark.sql.hive.orc" -> orc,
+      "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
+      "org.apache.spark.ml.source.libsvm" -> libsvm,
+      "com.databricks.spark.csv" -> csv
+    )
+  }
+
+  /**
+   * Class that were removed in Spark 2.0. Used to detect incompatibility libraries for Spark 2.0.
+   */
+  private val spark2RemovedClasses = Set(
+    "org.apache.spark.sql.DataFrame",
+    "org.apache.spark.sql.sources.HadoopFsRelationProvider",
+    "org.apache.spark.Logging")
+
+  /** Given a provider name, look up the data source class definition. */
+  def lookupDataSource(provider: String): Class[_] = {
+    val provider1 = backwardCompatibilityMap.getOrElse(provider, provider)
+    val provider2 = s"$provider1.DefaultSource"
+    val loader = Utils.getContextOrSparkClassLoader
+    val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
+
+    try {
+      serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList match {
+        // the provider format did not match any given registered aliases
+        case Nil =>
+          try {
+            Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match {
+              case Success(dataSource) =>
+                // Found the data source using fully qualified path
+                dataSource
+              case Failure(error) =>
+                if (provider1.toLowerCase == "orc" ||
+                  provider1.startsWith("org.apache.spark.sql.hive.orc")) {
+                  throw new AnalysisException(
+                    "The ORC data source must be used with Hive support enabled")
+                } else if (provider1.toLowerCase == "avro" ||
+                  provider1 == "com.databricks.spark.avro") {
+                  throw new AnalysisException(
+                    s"Failed to find data source: ${provider1.toLowerCase}. Please find an Avro " +
+                      "package at " +
+                      "https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects")
+                } else {
+                  throw new ClassNotFoundException(
+                    s"Failed to find data source: $provider1. Please find packages at " +
+                      "https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects",
+                    error)
+                }
+            }
+          } catch {
+            case e: NoClassDefFoundError => // This one won't be caught by Scala NonFatal
+              // NoClassDefFoundError's class name uses "/" rather than "." for packages
+              val className = e.getMessage.replaceAll("/", ".")
+              if (spark2RemovedClasses.contains(className)) {
+                throw new ClassNotFoundException(s"$className was removed in Spark 2.0. " +
+                  "Please check if your library is compatible with Spark 2.0", e)
+              } else {
+                throw e
+              }
+          }
+        case head :: Nil =>
+          // there is exactly one registered alias
+          head.getClass
+        case sources =>
+          // There are multiple registered aliases for the input
+          sys.error(s"Multiple sources found for $provider1 " +
+            s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
+            "please specify the fully qualified class name.")
+      }
+    } catch {
+      case e: ServiceConfigurationError if e.getCause.isInstanceOf[NoClassDefFoundError] =>
+        // NoClassDefFoundError's class name uses "/" rather than "." for packages
+        val className = e.getCause.getMessage.replaceAll("/", ".")
+        if (spark2RemovedClasses.contains(className)) {
+          throw new ClassNotFoundException(s"Detected an incompatible DataSourceRegister. " +
+            "Please remove the incompatible library from classpath or upgrade it. " +
+            s"Error: ${e.getMessage}", e)
+        } else {
+          throw e
+        }
+    }
+  }
+
+  /**
+   * When creating a data source table, the `path` option has a special meaning: the table location.
+   * This method extracts the `path` option and treat it as table location to build a
+   * [[CatalogStorageFormat]]. Note that, the `path` option is removed from options after this.
+   */
+  def buildStorageFormatFromOptions(options: Map[String, String]): CatalogStorageFormat = {
+    val path = new CaseInsensitiveMap(options).get("path")
+    val optionsWithoutPath = options.filterKeys(_.toLowerCase != "path")
+    CatalogStorageFormat.empty.copy(locationUri = path, properties = optionsWithoutPath)
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 47c1f9d3fac1e07abc1cca663e95f71385c5affe..e87998fe4ad8d1523c5046c987a0feaf9e852f48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -237,6 +237,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
       sparkSession: SparkSession,
       simpleCatalogRelation: SimpleCatalogRelation): LogicalPlan = {
     val table = simpleCatalogRelation.catalogTable
+    val pathOption = table.storage.locationUri.map("path" -> _)
     val dataSource =
       DataSource(
         sparkSession,
@@ -244,7 +245,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
         partitionColumns = table.partitionColumnNames,
         bucketSpec = table.bucketSpec,
         className = table.provider.get,
-        options = table.storage.properties)
+        options = table.storage.properties ++ pathOption)
 
     LogicalRelation(
       dataSource.resolveRelation(),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 44fd38dfb96f68361adc0dffd387f772d5cd447f..d3e323cb128914588bf465e1c9ae00ee07442a95 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdenti
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
-import org.apache.spark.sql.execution.datasources.CreateTable
+import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
 import org.apache.spark.sql.types.StructType
 
 
@@ -354,7 +354,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
     val tableDesc = CatalogTable(
       identifier = tableIdent,
       tableType = CatalogTableType.EXTERNAL,
-      storage = CatalogStorageFormat.empty.copy(properties = options),
+      storage = DataSource.buildStorageFormatFromOptions(options),
       schema = schema,
       provider = Some(source)
     )
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 9fb0f5384d889460f3dc093bdc2ece0474d647b9..bde3c8a42e1c01cccdea53cc455fcb3433587ece 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1145,7 +1145,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
           assert(storageFormat.properties.isEmpty)
           assert(storageFormat.locationUri === Some(expected))
         } else {
-          assert(storageFormat.properties.get("path") === Some(expected))
           assert(storageFormat.locationUri === Some(expected))
         }
       } else {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..bef47aacd337969b2d288d82f9dab8b3582affc2
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
@@ -0,0 +1,136 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.sources
+
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StructType}
+
+class TestOptionsSource extends SchemaRelationProvider with CreatableRelationProvider {
+
+  // This is used in the read path.
+  override def createRelation(
+      sqlContext: SQLContext,
+      parameters: Map[String, String],
+      schema: StructType): BaseRelation = {
+    new TestOptionsRelation(parameters)(sqlContext.sparkSession)
+  }
+
+  // This is used in the write path.
+  override def createRelation(
+      sqlContext: SQLContext,
+      mode: SaveMode,
+      parameters: Map[String, String],
+      data: DataFrame): BaseRelation = {
+    new TestOptionsRelation(parameters)(sqlContext.sparkSession)
+  }
+}
+
+class TestOptionsRelation(val options: Map[String, String])(@transient val session: SparkSession)
+  extends BaseRelation {
+
+  override def sqlContext: SQLContext = session.sqlContext
+
+  def pathOption: Option[String] = options.get("path")
+
+  // We can't get the relation directly for write path, here we put the path option in schema
+  // metadata, so that we can test it later.
+  override def schema: StructType = {
+    val metadataWithPath = pathOption.map {
+      path => new MetadataBuilder().putString("path", path).build()
+    }
+    new StructType().add("i", IntegerType, true, metadataWithPath.getOrElse(Metadata.empty))
+  }
+}
+
+class PathOptionSuite extends DataSourceTest with SharedSQLContext {
+
+  test("path option always exist") {
+    withTable("src") {
+      sql(
+        s"""
+           |CREATE TABLE src(i int)
+           |USING ${classOf[TestOptionsSource].getCanonicalName}
+           |OPTIONS (PATH '/tmp/path')
+        """.stripMargin)
+      assert(getPathOption("src") == Some("/tmp/path"))
+    }
+
+    // should exist even path option is not specified when creating table
+    withTable("src") {
+      sql(s"CREATE TABLE src(i int) USING ${classOf[TestOptionsSource].getCanonicalName}")
+      assert(getPathOption("src") == Some(defaultTablePath("src")))
+    }
+  }
+
+  test("path option also exist for write path") {
+    withTable("src") {
+      withTempPath { path =>
+        sql(
+          s"""
+            |CREATE TABLE src
+            |USING ${classOf[TestOptionsSource].getCanonicalName}
+            |OPTIONS (PATH '${path.getAbsolutePath}')
+            |AS SELECT 1
+          """.stripMargin)
+        assert(spark.table("src").schema.head.metadata.getString("path") == path.getAbsolutePath)
+      }
+    }
+
+    // should exist even path option is not specified when creating table
+    withTable("src") {
+      sql(
+        s"""
+           |CREATE TABLE src
+           |USING ${classOf[TestOptionsSource].getCanonicalName}
+           |AS SELECT 1
+          """.stripMargin)
+      assert(spark.table("src").schema.head.metadata.getString("path") == defaultTablePath("src"))
+    }
+  }
+
+  test("path option always represent the value of table location") {
+    withTable("src") {
+      sql(
+        s"""
+           |CREATE TABLE src(i int)
+           |USING ${classOf[TestOptionsSource].getCanonicalName}
+           |OPTIONS (PATH '/tmp/path')""".stripMargin)
+      sql("ALTER TABLE src SET LOCATION '/tmp/path2'")
+      assert(getPathOption("src") == Some("/tmp/path2"))
+    }
+
+    withTable("src", "src2") {
+      sql(s"CREATE TABLE src(i int) USING ${classOf[TestOptionsSource].getCanonicalName}")
+      sql("ALTER TABLE src RENAME TO src2")
+      assert(getPathOption("src2") == Some(defaultTablePath("src2")))
+    }
+  }
+
+  private def getPathOption(tableName: String): Option[String] = {
+    spark.table(tableName).queryExecution.analyzed.collect {
+      case LogicalRelation(r: TestOptionsRelation, _, _) => r.pathOption
+    }.head
+  }
+
+  private def defaultTablePath(tableName: String): String = {
+    spark.sessionState.catalog.defaultTablePath(TableIdentifier(tableName))
+  }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 409c316c6802c0ba7a04ae052a080e1c9d54d36f..ebba203ac593cc0426341d0d3e7955741385a037 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -29,7 +29,7 @@ import org.apache.thrift.TException
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions._
@@ -38,9 +38,8 @@ import org.apache.spark.sql.execution.command.{ColumnStatStruct, DDLUtils}
 import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
 import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.internal.HiveSerDe
-import org.apache.spark.sql.internal.SQLConf._
 import org.apache.spark.sql.internal.StaticSQLConf._
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.types.{DataType, StructType}
 
 
 /**
@@ -189,66 +188,39 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       throw new TableAlreadyExistsException(db = db, table = table)
     }
     // Before saving data source table metadata into Hive metastore, we should:
-    //  1. Put table provider, schema, partition column names, bucket specification and partition
-    //     provider in table properties.
+    //  1. Put table metadata like provider, schema, etc. in table properties.
     //  2. Check if this table is hive compatible
-    //    2.1  If it's not hive compatible, set schema, partition columns and bucket spec to empty
-    //         and save table metadata to Hive.
+    //    2.1  If it's not hive compatible, set location URI, schema, partition columns and bucket
+    //         spec to empty and save table metadata to Hive.
     //    2.2  If it's hive compatible, set serde information in table metadata and try to save
     //         it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
     if (DDLUtils.isDatasourceTable(tableDefinition)) {
-      // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
-      val provider = tableDefinition.provider.get
-      val partitionColumns = tableDefinition.partitionColumnNames
-      val bucketSpec = tableDefinition.bucketSpec
-
-      val tableProperties = new scala.collection.mutable.HashMap[String, String]
-      tableProperties.put(DATASOURCE_PROVIDER, provider)
-      if (tableDefinition.partitionProviderIsHive) {
-        tableProperties.put(TABLE_PARTITION_PROVIDER, "hive")
-      }
-
-      // Serialized JSON schema string may be too long to be stored into a single metastore table
-      // property. In this case, we split the JSON string and store each part as a separate table
-      // property.
-      val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)
-      val schemaJsonString = tableDefinition.schema.json
-      // Split the JSON string.
-      val parts = schemaJsonString.grouped(threshold).toSeq
-      tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
-      parts.zipWithIndex.foreach { case (part, index) =>
-        tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
-      }
-
-      if (partitionColumns.nonEmpty) {
-        tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
-        partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
-          tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
-        }
-      }
-
-      if (bucketSpec.isDefined) {
-        val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get
+      val tableProperties = tableMetaToTableProps(tableDefinition)
 
-        tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
-        tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString)
-        bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
-          tableProperties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol)
-        }
-
-        if (sortColumnNames.nonEmpty) {
-          tableProperties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, sortColumnNames.length.toString)
-          sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
-            tableProperties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", sortCol)
-          }
-        }
+      val needDefaultTableLocation = tableDefinition.tableType == MANAGED &&
+        tableDefinition.storage.locationUri.isEmpty
+      val tableLocation = if (needDefaultTableLocation) {
+        Some(defaultTablePath(tableDefinition.identifier))
+      } else {
+        tableDefinition.storage.locationUri
       }
+      // Ideally we should also put `locationUri` in table properties like provider, schema, etc.
+      // However, in older version of Spark we already store table location in storage properties
+      // with key "path". Here we keep this behaviour for backward compatibility.
+      val storagePropsWithLocation = tableDefinition.storage.properties ++
+        tableLocation.map("path" -> _)
 
       // converts the table metadata to Spark SQL specific format, i.e. set data schema, names and
       // bucket specification to empty. Note that partition columns are retained, so that we can
       // call partition-related Hive API later.
       def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
         tableDefinition.copy(
+          // Hive only allows directory paths as location URIs while Spark SQL data source tables
+          // also allow file paths. For non-hive-compatible format, we should not set location URI
+          // to avoid hive metastore to throw exception.
+          storage = tableDefinition.storage.copy(
+            locationUri = None,
+            properties = storagePropsWithLocation),
           schema = tableDefinition.partitionSchema,
           bucketSpec = None,
           properties = tableDefinition.properties ++ tableProperties)
@@ -259,10 +231,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
         val location = if (tableDefinition.tableType == EXTERNAL) {
           // When we hit this branch, we are saving an external data source table with hive
           // compatible format, which means the data source is file-based and must have a `path`.
-          val map = new CaseInsensitiveMap(tableDefinition.storage.properties)
-          require(map.contains("path"),
+          require(tableDefinition.storage.locationUri.isDefined,
             "External file-based data source table must have a `path` entry in storage properties.")
-          Some(new Path(map("path")).toUri.toString)
+          Some(new Path(tableDefinition.storage.locationUri.get).toUri.toString)
         } else {
           None
         }
@@ -272,7 +243,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
             locationUri = location,
             inputFormat = serde.inputFormat,
             outputFormat = serde.outputFormat,
-            serde = serde.serde
+            serde = serde.serde,
+            properties = storagePropsWithLocation
           ),
           properties = tableDefinition.properties ++ tableProperties)
       }
@@ -337,6 +309,68 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     }
   }
 
+  /**
+   * Data source tables may be non Hive compatible and we need to store table metadata in table
+   * properties to workaround some Hive metastore limitations.
+   * This method puts table provider, partition provider, schema, partition column names, bucket
+   * specification into a map, which can be used as table properties later.
+   */
+  private def tableMetaToTableProps(table: CatalogTable): scala.collection.Map[String, String] = {
+    // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
+    val provider = table.provider.get
+    val partitionColumns = table.partitionColumnNames
+    val bucketSpec = table.bucketSpec
+
+    val properties = new scala.collection.mutable.HashMap[String, String]
+    properties.put(DATASOURCE_PROVIDER, provider)
+    if (table.partitionProviderIsHive) {
+      properties.put(TABLE_PARTITION_PROVIDER, "hive")
+    }
+
+    // Serialized JSON schema string may be too long to be stored into a single metastore table
+    // property. In this case, we split the JSON string and store each part as a separate table
+    // property.
+    val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)
+    val schemaJsonString = table.schema.json
+    // Split the JSON string.
+    val parts = schemaJsonString.grouped(threshold).toSeq
+    properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
+    parts.zipWithIndex.foreach { case (part, index) =>
+      properties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
+    }
+
+    if (partitionColumns.nonEmpty) {
+      properties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
+      partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
+        properties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
+      }
+    }
+
+    if (bucketSpec.isDefined) {
+      val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get
+
+      properties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
+      properties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString)
+      bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
+        properties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol)
+      }
+
+      if (sortColumnNames.nonEmpty) {
+        properties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, sortColumnNames.length.toString)
+        sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
+          properties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", sortCol)
+        }
+      }
+    }
+
+    properties
+  }
+
+  private def defaultTablePath(tableIdent: TableIdentifier): String = {
+    val dbLocation = getDatabase(tableIdent.database.get).locationUri
+    new Path(new Path(dbLocation), tableIdent.table).toString
+  }
+
   private def saveTableIntoHive(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
     assert(DDLUtils.isDatasourceTable(tableDefinition),
       "saveTableIntoHive only takes data source table.")
@@ -383,11 +417,35 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   }
 
   override def renameTable(db: String, oldName: String, newName: String): Unit = withClient {
-    val newTable = client.getTable(db, oldName)
-      .copy(identifier = TableIdentifier(newName, Some(db)))
+    val rawTable = client.getTable(db, oldName)
+
+    val storageWithNewPath = if (rawTable.tableType == MANAGED) {
+      // If it's a managed table and we are renaming it, then the path option becomes inaccurate
+      // and we need to update it according to the new table name.
+      val newTablePath = defaultTablePath(TableIdentifier(newName, Some(db)))
+      updateLocationInStorageProps(rawTable, Some(newTablePath))
+    } else {
+      rawTable.storage
+    }
+
+    val newTable = rawTable.copy(
+      identifier = TableIdentifier(newName, Some(db)),
+      storage = storageWithNewPath)
+
     client.alterTable(oldName, newTable)
   }
 
+  private def getLocationFromStorageProps(table: CatalogTable): Option[String] = {
+    new CaseInsensitiveMap(table.storage.properties).get("path")
+  }
+
+  private def updateLocationInStorageProps(
+      table: CatalogTable,
+      newPath: Option[String]): CatalogStorageFormat = {
+    val propsWithoutPath = table.storage.properties.filterKeys(_.toLowerCase != "path")
+    table.storage.copy(properties = propsWithoutPath ++ newPath.map("path" -> _))
+  }
+
   /**
    * Alter a table whose name that matches the one specified in `tableDefinition`,
    * assuming the table exists.
@@ -418,21 +476,36 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     }
 
     if (DDLUtils.isDatasourceTable(withStatsProps)) {
-      val oldDef = client.getTable(db, withStatsProps.identifier.table)
-      // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition,
-      // to retain the spark specific format if it is. Also add old data source properties to table
-      // properties, to retain the data source table format.
-      val oldDataSourceProps = oldDef.properties.filter(_._1.startsWith(SPARK_SQL_PREFIX))
+      val oldTableDef = client.getTable(db, withStatsProps.identifier.table)
+
+      val oldLocation = getLocationFromStorageProps(oldTableDef)
+      val newLocation = tableDefinition.storage.locationUri
+      // Only update the `locationUri` field if the location is really changed, because this table
+      // may be not Hive-compatible and can not set the `locationUri` field. We should respect the
+      // old `locationUri` even it's None.
+      val storageWithNewLocation = if (oldLocation == newLocation) {
+        oldTableDef.storage
+      } else {
+        updateLocationInStorageProps(oldTableDef, newLocation).copy(locationUri = newLocation)
+      }
+
       val partitionProviderProp = if (tableDefinition.partitionProviderIsHive) {
         TABLE_PARTITION_PROVIDER -> "hive"
       } else {
         TABLE_PARTITION_PROVIDER -> "builtin"
       }
+
+      // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition,
+      // to retain the spark specific format if it is. Also add old data source properties to table
+      // properties, to retain the data source table format.
+      val oldDataSourceProps = oldTableDef.properties.filter(_._1.startsWith(SPARK_SQL_PREFIX))
+      val newTableProps = oldDataSourceProps ++ withStatsProps.properties + partitionProviderProp
       val newDef = withStatsProps.copy(
-        schema = oldDef.schema,
-        partitionColumnNames = oldDef.partitionColumnNames,
-        bucketSpec = oldDef.bucketSpec,
-        properties = oldDataSourceProps ++ withStatsProps.properties + partitionProviderProp)
+        storage = storageWithNewLocation,
+        schema = oldTableDef.schema,
+        partitionColumnNames = oldTableDef.partitionColumnNames,
+        bucketSpec = oldTableDef.bucketSpec,
+        properties = newTableProps)
 
       client.alterTable(newDef)
     } else {
@@ -465,22 +538,16 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     } else {
       getProviderFromTableProperties(table).map { provider =>
         assert(provider != "hive", "Hive serde table should not save provider in table properties.")
-        // SPARK-15269: Persisted data source tables always store the location URI as a storage
-        // property named "path" instead of standard Hive `dataLocation`, because Hive only
-        // allows directory paths as location URIs while Spark SQL data source tables also
-        // allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL
-        // data source tables.
-        // Spark SQL may also save external data source in Hive compatible format when
-        // possible, so that these tables can be directly accessed by Hive. For these tables,
-        // `dataLocation` is still necessary. Here we also check for input format because only
-        // these Hive compatible tables set this field.
-        val storage = if (table.tableType == EXTERNAL && table.storage.inputFormat.isEmpty) {
-          table.storage.copy(locationUri = None)
-        } else {
-          table.storage
+        // Internally we store the table location in storage properties with key "path" for data
+        // source tables. Here we set the table location to `locationUri` field and filter out the
+        // path option in storage properties, to avoid exposing this concept externally.
+        val storageWithLocation = {
+          val tableLocation = getLocationFromStorageProps(table)
+          updateLocationInStorageProps(table, None).copy(locationUri = tableLocation)
         }
+
         table.copy(
-          storage = storage,
+          storage = storageWithLocation,
           schema = getSchemaFromTableProperties(table),
           provider = Some(provider),
           partitionColumnNames = getPartitionColumnsFromTableProperties(table),
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 624ab747e442f6cd3eb1f86b508a3438a51b04b6..8e5fc88aad448d1864e5d2ef69a5fe6ab8b9e10d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -17,16 +17,13 @@
 
 package org.apache.spark.sql.hive
 
-import scala.collection.JavaConverters._
-
 import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.execution.command.DDLUtils
@@ -56,12 +53,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
       tableIdent.table.toLowerCase)
   }
 
-  private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = {
-    QualifiedTableName(
-      t.identifier.database.getOrElse(getCurrentDatabase).toLowerCase,
-      t.identifier.table.toLowerCase)
-  }
-
   /** A cache of Spark SQL data source tables that have been accessed. */
   protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = {
     val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
@@ -69,6 +60,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
         logDebug(s"Creating new cached data source for $in")
         val table = sparkSession.sharedState.externalCatalog.getTable(in.database, in.name)
 
+        val pathOption = table.storage.locationUri.map("path" -> _)
         val dataSource =
           DataSource(
             sparkSession,
@@ -76,7 +68,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
             partitionColumns = table.partitionColumnNames,
             bucketSpec = table.bucketSpec,
             className = table.provider.get,
-            options = table.storage.properties,
+            options = table.storage.properties ++ pathOption,
             catalogTable = Some(table))
 
         LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 0477ea4d4c380fb94efd3a7a4b8474f2ca677e82..7abc4d9623f7123108d3735243a6e85fe7d9d93b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -142,8 +142,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
           assert(hiveTable.storage.serde === Some(serde))
 
           assert(hiveTable.tableType === CatalogTableType.EXTERNAL)
-          assert(hiveTable.storage.locationUri ===
-            Some(path.toURI.toString.stripSuffix(File.separator)))
+          assert(hiveTable.storage.locationUri === Some(path.toString))
 
           val columns = hiveTable.schema
           assert(columns.map(_.name) === Seq("d1", "d2"))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index eaa67d370db37eb21714b6e456b1e8ffc7cfe4f1..c50f92e783c8803b839e2915e0aa39def2f8b569 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -998,7 +998,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         identifier = TableIdentifier("not_skip_hive_metadata"),
         tableType = CatalogTableType.EXTERNAL,
         storage = CatalogStorageFormat.empty.copy(
-          properties = Map("path" -> tempPath.getCanonicalPath, "skipHiveMetadata" -> "false")
+          locationUri = Some(tempPath.getCanonicalPath),
+          properties = Map("skipHiveMetadata" -> "false")
         ),
         schema = schema,
         provider = Some("parquet")
@@ -1282,9 +1283,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         sql("insert into t values (2, 3, 4)")
         checkAnswer(table("t"), Seq(Row(1, 2, 3), Row(2, 3, 4)))
         val catalogTable = hiveClient.getTable("default", "t")
-        // there should not be a lowercase key 'path' now
-        assert(catalogTable.storage.properties.get("path").isEmpty)
-        assert(catalogTable.storage.properties.get("PATH").isDefined)
+        assert(catalogTable.storage.locationUri.isDefined)
       }
     }
   }
@@ -1351,4 +1350,25 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       sparkSession.sparkContext.conf.set(DEBUG_MODE, previousValue)
     }
   }
+
+  test("SPARK-17470: support old table that stores table location in storage properties") {
+    withTable("old") {
+      withTempPath { path =>
+        Seq(1 -> "a").toDF("i", "j").write.parquet(path.getAbsolutePath)
+        val tableDesc = CatalogTable(
+          identifier = TableIdentifier("old", Some("default")),
+          tableType = CatalogTableType.EXTERNAL,
+          storage = CatalogStorageFormat.empty.copy(
+            properties = Map("path" -> path.getAbsolutePath)
+          ),
+          schema = new StructType(),
+          properties = Map(
+            HiveExternalCatalog.DATASOURCE_PROVIDER -> "parquet",
+            HiveExternalCatalog.DATASOURCE_SCHEMA ->
+              new StructType().add("i", "int").add("j", "string").json))
+        hiveClient.createTable(tableDesc, ignoreIfExists = false)
+        checkAnswer(spark.table("old"), Row(1, "a"))
+      }
+    }
+  }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
index 7ba880e47613702e1af52d7767ef281db0fd5308..cfc1d81d544eb28b4223a5a0ece8622b3c1e1714 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
@@ -29,7 +29,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
     val expectedPath =
       spark.sharedState.externalCatalog.getDatabase(dbName).locationUri + "/" + tableName
 
-    assert(metastoreTable.storage.properties("path") === expectedPath)
+    assert(metastoreTable.storage.locationUri.get === expectedPath)
   }
 
   private def getTableNames(dbName: Option[String] = None): Array[String] = {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index e9268a922cf54e920ba92a501deb72148998b908..682d7d4b163ddfd042d3d3fec127b7c6714f691d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -859,14 +859,6 @@ class HiveDDLSuite
     }
   }
 
-  private def getTablePath(table: CatalogTable): Option[String] = {
-    if (DDLUtils.isDatasourceTable(table)) {
-      new CaseInsensitiveMap(table.storage.properties).get("path")
-    } else {
-      table.storage.locationUri
-    }
-  }
-
   private def checkCreateTableLike(sourceTable: CatalogTable, targetTable: CatalogTable): Unit = {
     // The created table should be a MANAGED table with empty view text and original text.
     assert(targetTable.tableType == CatalogTableType.MANAGED,
@@ -915,10 +907,8 @@ class HiveDDLSuite
       assert(targetTable.provider == sourceTable.provider)
     }
 
-    val sourceTablePath = getTablePath(sourceTable)
-    val targetTablePath = getTablePath(targetTable)
-    assert(targetTablePath.nonEmpty, "target table path should not be empty")
-    assert(sourceTablePath != targetTablePath,
+    assert(targetTable.storage.locationUri.nonEmpty, "target table path should not be empty")
+    assert(sourceTable.storage.locationUri != targetTable.storage.locationUri,
       "source table/view path should be different from target table path")
 
     // The source table contents should not been seen in the target table.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index b9353b5b5d2a7daece970b9e703303429a5625de..3a597d6afb15328e73b9d767af484051ddc5642f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -517,7 +517,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     val catalogTable =
       sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
     relation match {
-      case LogicalRelation(r: HadoopFsRelation, _, _) =>
+      case LogicalRelation(r: HadoopFsRelation, _, Some(table)) =>
         if (!isDataSourceTable) {
           fail(
             s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
@@ -525,7 +525,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
         }
         userSpecifiedLocation match {
           case Some(location) =>
-            assert(r.options("path") === location)
+            assert(table.storage.locationUri.get === location)
           case None => // OK.
         }
         assert(catalogTable.provider.get === format)