diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index a97ed701c4207ad25aa59ff115b88466c54f198c..7c3bec897956a96a28af722b851c68ff652aee66 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -89,9 +89,10 @@ case class CatalogTablePartition(
     parameters: Map[String, String] = Map.empty) {
 
   override def toString: String = {
+    val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ")
     val output =
       Seq(
-        s"Partition Values: [${spec.values.mkString(", ")}]",
+        s"Partition Values: [$specString]",
         s"$storage",
         s"Partition Parameters:{${parameters.map(p => p._1 + "=" + p._2).mkString(", ")}}")
 
@@ -137,6 +138,8 @@ case class BucketSpec(
  *                 Can be None if this table is a View, should be "hive" for hive serde tables.
  * @param unsupportedFeatures is a list of string descriptions of features that are used by the
  *        underlying table but not supported by Spark SQL yet.
+ * @param partitionProviderIsHive whether this table's partition metadata is stored in the Hive
+ *                                metastore.
  */
 case class CatalogTable(
     identifier: TableIdentifier,
@@ -154,7 +157,8 @@ case class CatalogTable(
     viewOriginalText: Option[String] = None,
     viewText: Option[String] = None,
     comment: Option[String] = None,
-    unsupportedFeatures: Seq[String] = Seq.empty) {
+    unsupportedFeatures: Seq[String] = Seq.empty,
+    partitionProviderIsHive: Boolean = false) {
 
   /** schema of this table's partition columns */
   def partitionSchema: StructType = StructType(schema.filter {
@@ -212,11 +216,11 @@ case class CatalogTable(
         comment.map("Comment: " + _).getOrElse(""),
         if (properties.nonEmpty) s"Properties: $tableProperties" else "",
         if (stats.isDefined) s"Statistics: ${stats.get.simpleString}" else "",
-        s"$storage")
+        s"$storage",
+        if (partitionProviderIsHive) "Partition Provider: Hive" else "")
 
     output.filter(_.nonEmpty).mkString("CatalogTable(\n\t", "\n\t", ")")
   }
-
 }
 
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index cb0426c7a98a18fbe7d01efa6d11cb32267e2be7..3eff12f9eed14e92761c5ccf097f1eaac3d7de6d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -489,6 +489,7 @@ class TreeNodeSuite extends SparkFunSuite {
         "owner" -> "",
         "createTime" -> 0,
         "lastAccessTime" -> -1,
+        "partitionProviderIsHive" -> false,
         "properties" -> JNull,
         "unsupportedFeatures" -> List.empty[String]))
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 4b5f0246b9a1bd4fbbed0950eda404f4898cdca0..7ff3522f547d3058b774b5e3c3d0536889a3a3d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -25,7 +25,8 @@ import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Union}
+import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
 import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, CreateTable, DataSource, HadoopFsRelation}
 import org.apache.spark.sql.types.StructType
 
@@ -387,7 +388,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
           partitionColumnNames = partitioningColumns.getOrElse(Nil),
           bucketSpec = getBucketSpec
         )
-        val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
+        val createCmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
+        val cmd = if (tableDesc.partitionColumnNames.nonEmpty &&
+            df.sparkSession.sqlContext.conf.manageFilesourcePartitions) {
+          // Need to recover partitions into the metastore so our saved data is visible.
+          val recoverPartitionCmd = AlterTableRecoverPartitionsCommand(tableDesc.identifier)
+          Union(createCmd, recoverPartitionCmd)
+        } else {
+          createCmd
+        }
         df.sparkSession.sessionState.executePlan(cmd).toRdd
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index 488138709a12b9d733dded6dee2d805c38a4ea69..f873f34a845ef16b0375f03dcb397027cbbd0e1c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -50,7 +50,8 @@ case class AnalyzeColumnCommand(
           AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable))
 
       case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
-        updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes)
+        updateStats(logicalRel.catalogTable.get,
+          AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get))
 
       case otherRelation =>
         throw new AnalysisException("ANALYZE TABLE is not supported for " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 7b0e49b665f42a1ab0a6a1d3bb3da62bc4f949cc..52a8fc88c56cd9059e1d4a8a3ec3e91550cb54f3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -51,7 +51,8 @@ case class AnalyzeTableCommand(
 
       // data source tables have been converted into LogicalRelations
       case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
-        updateTableStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes)
+        updateTableStats(logicalRel.catalogTable.get,
+          AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get))
 
       case otherRelation =>
         throw new AnalysisException("ANALYZE TABLE is not supported for " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index a8c75a7f29cef6b6fe3a2bee070d5628e1ef5e35..2a9743130d4c4cb70cfe746b94d0da0aef684000 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -94,10 +94,16 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
     val newTable = table.copy(
       storage = table.storage.copy(properties = optionsWithPath),
       schema = dataSource.schema,
-      partitionColumnNames = partitionColumnNames)
+      partitionColumnNames = partitionColumnNames,
+      // If metastore partition management for file source tables is enabled, we start off with
+      // partition provider hive, but no partitions in the metastore. The user has to call
+      // `msck repair table` to populate the table partitions.
+      partitionProviderIsHive = partitionColumnNames.nonEmpty &&
+        sparkSession.sessionState.conf.manageFilesourcePartitions)
     // We will return Nil or throw exception at the beginning if the table already exists, so when
     // we reach here, the table should not exist and we should set `ignoreIfExists` to false.
     sessionState.catalog.createTable(newTable, ignoreIfExists = false)
+
     Seq.empty[Row]
   }
 }
@@ -232,6 +238,15 @@ case class CreateDataSourceTableAsSelectCommand(
       sessionState.catalog.createTable(newTable, ignoreIfExists = false)
     }
 
+    result match {
+      case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
+          sparkSession.sqlContext.conf.manageFilesourcePartitions =>
+        // Need to recover partitions into the metastore so our saved data is visible.
+        sparkSession.sessionState.executePlan(
+          AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
+      case _ =>
+    }
+
     // Refresh the cache of the table in the catalog.
     sessionState.catalog.refreshTable(tableIdentWithDB)
     Seq.empty[Row]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 15656faa08e4f4024fdc4133cb2b2ff27a03d34c..61e0550cef5e3074aaa39d6b62eff56f30b233b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -28,10 +28,11 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, PartitioningUtils}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.SerializableConfiguration
 
@@ -346,10 +347,7 @@ case class AlterTableAddPartitionCommand(
     val catalog = sparkSession.sessionState.catalog
     val table = catalog.getTableMetadata(tableName)
     DDLUtils.verifyAlterTableType(catalog, table, isView = false)
-    if (DDLUtils.isDatasourceTable(table)) {
-      throw new AnalysisException(
-        "ALTER TABLE ADD PARTITION is not allowed for tables defined using the datasource API")
-    }
+    DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE ADD PARTITION")
     val parts = partitionSpecsAndLocs.map { case (spec, location) =>
       val normalizedSpec = PartitioningUtils.normalizePartitionSpec(
         spec,
@@ -382,11 +380,8 @@ case class AlterTableRenamePartitionCommand(
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
     val table = catalog.getTableMetadata(tableName)
-    if (DDLUtils.isDatasourceTable(table)) {
-      throw new AnalysisException(
-        "ALTER TABLE RENAME PARTITION is not allowed for tables defined using the datasource API")
-    }
     DDLUtils.verifyAlterTableType(catalog, table, isView = false)
+    DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE RENAME PARTITION")
 
     val normalizedOldPartition = PartitioningUtils.normalizePartitionSpec(
       oldPartition,
@@ -432,10 +427,7 @@ case class AlterTableDropPartitionCommand(
     val catalog = sparkSession.sessionState.catalog
     val table = catalog.getTableMetadata(tableName)
     DDLUtils.verifyAlterTableType(catalog, table, isView = false)
-    if (DDLUtils.isDatasourceTable(table)) {
-      throw new AnalysisException(
-        "ALTER TABLE DROP PARTITIONS is not allowed for tables defined using the datasource API")
-    }
+    DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION")
 
     val normalizedSpecs = specs.map { spec =>
       PartitioningUtils.normalizePartitionSpec(
@@ -493,33 +485,39 @@ case class AlterTableRecoverPartitionsCommand(
     }
   }
 
+  private def getBasePath(table: CatalogTable): Option[String] = {
+    if (table.provider == Some("hive")) {
+      table.storage.locationUri
+    } else {
+      new CaseInsensitiveMap(table.storage.properties).get("path")
+    }
+  }
+
   override def run(spark: SparkSession): Seq[Row] = {
     val catalog = spark.sessionState.catalog
     val table = catalog.getTableMetadata(tableName)
     val tableIdentWithDB = table.identifier.quotedString
     DDLUtils.verifyAlterTableType(catalog, table, isView = false)
-    if (DDLUtils.isDatasourceTable(table)) {
-      throw new AnalysisException(
-        s"Operation not allowed: $cmd on datasource tables: $tableIdentWithDB")
-    }
     if (table.partitionColumnNames.isEmpty) {
       throw new AnalysisException(
         s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB")
     }
-    if (table.storage.locationUri.isEmpty) {
+
+    val tablePath = getBasePath(table)
+    if (tablePath.isEmpty) {
       throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " +
         s"location provided: $tableIdentWithDB")
     }
 
-    val root = new Path(table.storage.locationUri.get)
+    val root = new Path(tablePath.get)
     logInfo(s"Recover all the partitions in $root")
     val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
 
     val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
     val hadoopConf = spark.sparkContext.hadoopConfiguration
     val pathFilter = getPathFilter(hadoopConf)
-    val partitionSpecsAndLocs = scanPartitions(
-      spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase), threshold)
+    val partitionSpecsAndLocs = scanPartitions(spark, fs, pathFilter, root, Map(),
+      table.partitionColumnNames, threshold, spark.sessionState.conf.resolver)
     val total = partitionSpecsAndLocs.length
     logInfo(s"Found $total partitions in $root")
 
@@ -531,6 +529,11 @@ case class AlterTableRecoverPartitionsCommand(
     logInfo(s"Finished to gather the fast stats for all $total partitions.")
 
     addPartitions(spark, table, partitionSpecsAndLocs, partitionStats)
+    // Updates the table to indicate that its partition metadata is stored in the Hive metastore.
+    // This is always the case for Hive format tables, but is not true for Datasource tables created
+    // before Spark 2.1 unless they are converted via `msck repair table`.
+    spark.sessionState.catalog.alterTable(table.copy(partitionProviderIsHive = true))
+    catalog.refreshTable(tableName)
     logInfo(s"Recovered all partitions ($total).")
     Seq.empty[Row]
   }
@@ -544,7 +547,8 @@ case class AlterTableRecoverPartitionsCommand(
       path: Path,
       spec: TablePartitionSpec,
       partitionNames: Seq[String],
-      threshold: Int): GenSeq[(TablePartitionSpec, Path)] = {
+      threshold: Int,
+      resolver: Resolver): GenSeq[(TablePartitionSpec, Path)] = {
     if (partitionNames.isEmpty) {
       return Seq(spec -> path)
     }
@@ -563,15 +567,15 @@ case class AlterTableRecoverPartitionsCommand(
       val name = st.getPath.getName
       if (st.isDirectory && name.contains("=")) {
         val ps = name.split("=", 2)
-        val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase
+        val columnName = PartitioningUtils.unescapePathName(ps(0))
         // TODO: Validate the value
         val value = PartitioningUtils.unescapePathName(ps(1))
-        // comparing with case-insensitive, but preserve the case
-        if (columnName == partitionNames.head) {
-          scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(columnName -> value),
-            partitionNames.drop(1), threshold)
+        if (resolver(columnName, partitionNames.head)) {
+          scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value),
+            partitionNames.drop(1), threshold, resolver)
         } else {
-          logWarning(s"expect partition column ${partitionNames.head}, but got ${ps(0)}, ignore it")
+          logWarning(
+            s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it")
           Seq()
         }
       } else {
@@ -676,16 +680,11 @@ case class AlterTableSetLocationCommand(
     DDLUtils.verifyAlterTableType(catalog, table, isView = false)
     partitionSpec match {
       case Some(spec) =>
+        DDLUtils.verifyPartitionProviderIsHive(
+          sparkSession, table, "ALTER TABLE ... SET LOCATION")
         // Partition spec is specified, so we set the location only for this partition
         val part = catalog.getPartition(table.identifier, spec)
-        val newPart =
-          if (DDLUtils.isDatasourceTable(table)) {
-            throw new AnalysisException(
-              "ALTER TABLE SET LOCATION for partition is not allowed for tables defined " +
-              "using the datasource API")
-          } else {
-            part.copy(storage = part.storage.copy(locationUri = Some(location)))
-          }
+        val newPart = part.copy(storage = part.storage.copy(locationUri = Some(location)))
         catalog.alterPartitions(table.identifier, Seq(newPart))
       case None =>
         // No partition spec is specified, so we set the location for the table itself
@@ -709,6 +708,25 @@ object DDLUtils {
     table.provider.isDefined && table.provider.get != "hive"
   }
 
+  /**
+   * Throws a standard error for actions that require partitionProvider = hive.
+   */
+  def verifyPartitionProviderIsHive(
+      spark: SparkSession, table: CatalogTable, action: String): Unit = {
+    val tableName = table.identifier.table
+    if (!spark.sqlContext.conf.manageFilesourcePartitions && isDatasourceTable(table)) {
+      throw new AnalysisException(
+        s"$action is not allowed on $tableName since filesource partition management is " +
+          "disabled (spark.sql.hive.manageFilesourcePartitions = false).")
+    }
+    if (!table.partitionProviderIsHive && isDatasourceTable(table)) {
+      throw new AnalysisException(
+        s"$action is not allowed on $tableName since its partition metadata is not stored in " +
+          "the Hive metastore. To import this information into the metastore, run " +
+          s"`msck repair table $tableName`")
+    }
+  }
+
   /**
    * If the command ALTER VIEW is to alter a table or ALTER TABLE is to alter a view,
    * issue an exception [[AnalysisException]].
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index aec25430b719d2d2f8a96f478aca0b5467b074b0..4acfffb628047134dbb05c77b25957df1f1ccd20 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -358,19 +358,16 @@ case class TruncateTableCommand(
       throw new AnalysisException(
         s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentwithDB")
     }
-    val isDatasourceTable = DDLUtils.isDatasourceTable(table)
-    if (isDatasourceTable && partitionSpec.isDefined) {
-      throw new AnalysisException(
-        s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
-        s"for tables created using the data sources API: $tableIdentwithDB")
-    }
     if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
       throw new AnalysisException(
         s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
         s"for tables that are not partitioned: $tableIdentwithDB")
     }
+    if (partitionSpec.isDefined) {
+      DDLUtils.verifyPartitionProviderIsHive(spark, table, "TRUNCATE TABLE ... PARTITION")
+    }
     val locations =
-      if (isDatasourceTable) {
+      if (DDLUtils.isDatasourceTable(table)) {
         Seq(table.storage.properties.get("path"))
       } else if (table.partitionColumnNames.isEmpty) {
         Seq(table.storage.locationUri)
@@ -453,7 +450,7 @@ case class DescribeTableCommand(
           describeFormattedTableInfo(metadata, result)
         }
       } else {
-        describeDetailedPartitionInfo(catalog, metadata, result)
+        describeDetailedPartitionInfo(sparkSession, catalog, metadata, result)
       }
     }
 
@@ -492,6 +489,10 @@ case class DescribeTableCommand(
     describeStorageInfo(table, buffer)
 
     if (table.tableType == CatalogTableType.VIEW) describeViewInfo(table, buffer)
+
+    if (DDLUtils.isDatasourceTable(table) && table.partitionProviderIsHive) {
+      append(buffer, "Partition Provider:", "Hive", "")
+    }
   }
 
   private def describeStorageInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
@@ -528,6 +529,7 @@ case class DescribeTableCommand(
   }
 
   private def describeDetailedPartitionInfo(
+      spark: SparkSession,
       catalog: SessionCatalog,
       metadata: CatalogTable,
       result: ArrayBuffer[Row]): Unit = {
@@ -535,10 +537,7 @@ case class DescribeTableCommand(
       throw new AnalysisException(
         s"DESC PARTITION is not allowed on a view: ${table.identifier}")
     }
-    if (DDLUtils.isDatasourceTable(metadata)) {
-      throw new AnalysisException(
-        s"DESC PARTITION is not allowed on a datasource table: ${table.identifier}")
-    }
+    DDLUtils.verifyPartitionProviderIsHive(spark, metadata, "DESC PARTITION")
     val partition = catalog.getPartition(table, partitionSpec)
     if (isExtended) {
       describeExtendedDetailedPartitionInfo(table, metadata, partition, result)
@@ -743,10 +742,7 @@ case class ShowPartitionsCommand(
         s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableIdentWithDB")
     }
 
-    if (DDLUtils.isDatasourceTable(table)) {
-      throw new AnalysisException(
-        s"SHOW PARTITIONS is not allowed on a datasource table: $tableIdentWithDB")
-    }
+    DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "SHOW PARTITIONS")
 
     /**
      * Validate the partitioning spec by making sure all the referenced columns are
@@ -894,18 +890,11 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
 
   private def showHiveTableProperties(metadata: CatalogTable, builder: StringBuilder): Unit = {
     if (metadata.properties.nonEmpty) {
-      val filteredProps = metadata.properties.filterNot {
-        // Skips "EXTERNAL" property for external tables
-        case (key, _) => key == "EXTERNAL" && metadata.tableType == EXTERNAL
-      }
-
-      val props = filteredProps.map { case (key, value) =>
+      val props = metadata.properties.map { case (key, value) =>
         s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'"
       }
 
-      if (props.nonEmpty) {
-        builder ++= props.mkString("TBLPROPERTIES (\n  ", ",\n  ", "\n)\n")
-      }
+      builder ++= props.mkString("TBLPROPERTIES (\n  ", ",\n  ", "\n)\n")
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 17da606580eeae81651b6d730a5b0f09ccc16d46..5b8f05a396241c7708af0b9eeeb5c684f0ee8b83 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -30,7 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
@@ -65,6 +65,8 @@ import org.apache.spark.util.Utils
  * @param partitionColumns A list of column names that the relation is partitioned by. When this
  *                         list is empty, the relation is unpartitioned.
  * @param bucketSpec An optional specification for bucketing (hash-partitioning) of the data.
+ * @param catalogTable Optional catalog table reference that can be used to push down operations
+ *                     over the datasource to the catalog service.
  */
 case class DataSource(
     sparkSession: SparkSession,
@@ -73,7 +75,8 @@ case class DataSource(
     userSpecifiedSchema: Option[StructType] = None,
     partitionColumns: Seq[String] = Seq.empty,
     bucketSpec: Option[BucketSpec] = None,
-    options: Map[String, String] = Map.empty) extends Logging {
+    options: Map[String, String] = Map.empty,
+    catalogTable: Option[CatalogTable] = None) extends Logging {
 
   case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String])
 
@@ -412,9 +415,16 @@ case class DataSource(
             })
         }
 
-        val fileCatalog =
+        val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
+            catalogTable.isDefined && catalogTable.get.partitionProviderIsHive) {
+          new TableFileCatalog(
+            sparkSession,
+            catalogTable.get,
+            catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L))
+        } else {
           new ListingFileCatalog(
             sparkSession, globbedPaths, options, partitionSchema)
+        }
 
         val dataSchema = userSpecifiedSchema.map { schema =>
           val equality = sparkSession.sessionState.conf.resolver
@@ -423,7 +433,7 @@ case class DataSource(
           format.inferSchema(
             sparkSession,
             caseInsensitiveOptions,
-            fileCatalog.allFiles())
+            fileCatalog.asInstanceOf[ListingFileCatalog].allFiles())
         }.getOrElse {
           throw new AnalysisException(
             s"Unable to infer schema for $format at ${allPaths.take(2).mkString(",")}. " +
@@ -432,7 +442,7 @@ case class DataSource(
 
         HadoopFsRelation(
           fileCatalog,
-          partitionSchema = fileCatalog.partitionSpec().partitionColumns,
+          partitionSchema = fileCatalog.partitionSchema,
           dataSchema = dataSchema.asNullable,
           bucketSpec = bucketSpec,
           format,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 7d0abe86a44df0d52749fdaec4663d4a2b502b14..f0bcf94eadc960dfdb6c78f47faafcad02c2a5c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -30,11 +30,11 @@ import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union}
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
-import org.apache.spark.sql.execution.command.{DDLUtils, ExecutedCommandExec}
+import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, DDLUtils, ExecutedCommandExec}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -179,7 +179,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
           "Cannot overwrite a path that is also being read from.")
       }
 
-      InsertIntoHadoopFsRelationCommand(
+      val insertCmd = InsertIntoHadoopFsRelationCommand(
         outputPath,
         query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver),
         t.bucketSpec,
@@ -188,6 +188,15 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
         t.options,
         query,
         mode)
+
+      if (l.catalogTable.isDefined && l.catalogTable.get.partitionColumnNames.nonEmpty &&
+          l.catalogTable.get.partitionProviderIsHive) {
+        // TODO(ekl) we should be more efficient here and only recover the newly added partitions
+        val recoverPartitionCmd = AlterTableRecoverPartitionsCommand(l.catalogTable.get.identifier)
+        Union(insertCmd, recoverPartitionCmd)
+      } else {
+        insertCmd
+      }
   }
 }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
index 2bc66ceeebdb4c4d1ff15086a9c18419a7707d90..dba64624c34b3ec9f224fcd4955c63d4a2280069 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs._
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.StructType
 
 /**
  * A collection of data files from a partitioned relation, along with the partition values in the
@@ -63,4 +64,7 @@ trait FileCatalog {
 
   /** Sum of table file sizes, in bytes */
   def sizeInBytes: Long
+
+  /** Schema of the partitioning columns, or the empty schema if the table is not partitioned. */
+  def partitionSchema: StructType
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
index e0ec748a0b34d3b0560913d8b0d09930c74eb935..7c2e6fd04d5dba25b7a50b9b4efb5462d39c1905 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
@@ -64,7 +64,7 @@ object FileStatusCache {
    */
   def newCache(session: SparkSession): FileStatusCache = {
     synchronized {
-      if (session.sqlContext.conf.filesourcePartitionPruning &&
+      if (session.sqlContext.conf.manageFilesourcePartitions &&
           session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) {
         if (sharedCache == null) {
           sharedCache = new SharedInMemoryCache(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 9b1903c47119e1bf61588c93f33c3d4ce796fe5c..cc4049e9259053baafa44e27ef6eb71c82b7c5cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -38,19 +38,21 @@ import org.apache.spark.util.SerializableConfiguration
  * It provides the necessary methods to parse partition data based on a set of files.
  *
  * @param parameters as set of options to control partition discovery
- * @param partitionSchema an optional partition schema that will be use to provide types for the
- *                        discovered partitions
-*/
+ * @param userPartitionSchema an optional partition schema that will be use to provide types for
+ *                            the discovered partitions
+ */
 abstract class PartitioningAwareFileCatalog(
     sparkSession: SparkSession,
     parameters: Map[String, String],
-    partitionSchema: Option[StructType],
+    userPartitionSchema: Option[StructType],
     fileStatusCache: FileStatusCache = NoopCache) extends FileCatalog with Logging {
   import PartitioningAwareFileCatalog.BASE_PATH_PARAM
 
   /** Returns the specification of the partitions inferred from the data. */
   def partitionSpec(): PartitionSpec
 
+  override def partitionSchema: StructType = partitionSpec().partitionColumns
+
   protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
 
   protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus]
@@ -122,7 +124,7 @@ abstract class PartitioningAwareFileCatalog(
     val leafDirs = leafDirToChildrenFiles.filter { case (_, files) =>
       files.exists(f => isDataPath(f.getPath))
     }.keys.toSeq
-    partitionSchema match {
+    userPartitionSchema match {
       case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
         val spec = PartitioningUtils.parsePartitions(
           leafDirs,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
index 667379b222c48d45ef12c1e7f87ed66078ebf681..b459df5734d43be7539ffaa71359ab85d7f90a62 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.StructType
 
 
 /**
@@ -45,6 +46,8 @@ class TableFileCatalog(
 
   private val baseLocation = table.storage.locationUri
 
+  override def partitionSchema: StructType = table.partitionSchema
+
   override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
 
   override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
@@ -63,7 +66,6 @@ class TableFileCatalog(
     if (table.partitionColumnNames.nonEmpty) {
       val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
         table.identifier, filters)
-      val partitionSchema = table.partitionSchema
       val partitions = selectedPartitions.map { p =>
         PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get)
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f47ec7f3963a47819df13662455c3d79ebd3cedd..dc31f3bc323f6b51901291ff9701330b63fab2a4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -272,18 +272,20 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  val HIVE_FILESOURCE_PARTITION_PRUNING =
-    SQLConfigBuilder("spark.sql.hive.filesourcePartitionPruning")
-      .doc("When true, enable metastore partition pruning for filesource relations as well. " +
-           "This is currently implemented for converted Hive tables only.")
+  val HIVE_MANAGE_FILESOURCE_PARTITIONS =
+    SQLConfigBuilder("spark.sql.hive.manageFilesourcePartitions")
+      .doc("When true, enable metastore partition management for file source tables as well. " +
+           "This includes both datasource and converted Hive tables. When partition managment " +
+           "is enabled, datasource tables store partition in the Hive metastore, and use the " +
+           "metastore to prune partitions during query planning.")
       .booleanConf
       .createWithDefault(true)
 
   val HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE =
     SQLConfigBuilder("spark.sql.hive.filesourcePartitionFileCacheSize")
-      .doc("When nonzero, enable caching of partition file metadata in memory. All table share " +
+      .doc("When nonzero, enable caching of partition file metadata in memory. All tables share " +
            "a cache that can use up to specified num bytes for file metadata. This conf only " +
-           "applies if filesource partition pruning is also enabled.")
+           "has an effect when hive filesource partition management is enabled.")
       .longConf
       .createWithDefault(250 * 1024 * 1024)
 
@@ -679,7 +681,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
 
-  def filesourcePartitionPruning: Boolean = getConf(HIVE_FILESOURCE_PARTITION_PRUNING)
+  def manageFilesourcePartitions: Boolean = getConf(HIVE_MANAGE_FILESOURCE_PARTITIONS)
 
   def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE)
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
index 6857dd37286dd5ed1cc7c0da784b71e5fc04f059..2d73d9f1fc802066253448b807e55bc10e0c1fc6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -197,7 +197,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext {
       assertResult(expected.schema, s"Schema did not match for query #$i\n${expected.sql}") {
         output.schema
       }
-      assertResult(expected.output, s"Result dit not match for query #$i\n${expected.sql}") {
+      assertResult(expected.output, s"Result did not match for query #$i\n${expected.sql}") {
         output.output
       }
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index b989d01ec787a997378fbb83b5f3c4ea8458dbcc..9fb0f5384d889460f3dc093bdc2ece0474d647b9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -95,7 +95,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
         .add("b", "int"),
       provider = Some("hive"),
       partitionColumnNames = Seq("a", "b"),
-      createTime = 0L)
+      createTime = 0L,
+      partitionProviderIsHive = true)
   }
 
   private def createTable(catalog: SessionCatalog, name: TableIdentifier): Unit = {
@@ -923,68 +924,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
   }
 
   test("alter table: rename partition") {
-    val catalog = spark.sessionState.catalog
-    val tableIdent = TableIdentifier("tab1", Some("dbx"))
-    createPartitionedTable(tableIdent, isDatasourceTable = false)
-
-    // basic rename partition
-    sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', b='p')")
-    sql("ALTER TABLE dbx.tab1 PARTITION (a='2', b='c') RENAME TO PARTITION (a='20', b='c')")
-    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
-      Set(Map("a" -> "100", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
-
-    // rename without explicitly specifying database
-    catalog.setCurrentDatabase("dbx")
-    sql("ALTER TABLE tab1 PARTITION (a='100', b='p') RENAME TO PARTITION (a='10', b='p')")
-    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
-      Set(Map("a" -> "10", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
-
-    // table to alter does not exist
-    intercept[NoSuchTableException] {
-      sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')")
-    }
-
-    // partition to rename does not exist
-    intercept[NoSuchPartitionException] {
-      sql("ALTER TABLE tab1 PARTITION (a='not_found', b='1') RENAME TO PARTITION (a='1', b='2')")
-    }
-
-    // partition spec in RENAME PARTITION should be case insensitive by default
-    sql("ALTER TABLE tab1 PARTITION (A='10', B='p') RENAME TO PARTITION (A='1', B='p')")
-    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
-      Set(Map("a" -> "1", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
+    testRenamePartitions(isDatasourceTable = false)
   }
 
   test("alter table: rename partition (datasource table)") {
-    createPartitionedTable(TableIdentifier("tab1", Some("dbx")), isDatasourceTable = true)
-    val e = intercept[AnalysisException] {
-      sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', b='p')")
-    }.getMessage
-    assert(e.contains(
-      "ALTER TABLE RENAME PARTITION is not allowed for tables defined using the datasource API"))
-    // table to alter does not exist
-    intercept[NoSuchTableException] {
-      sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')")
-    }
-  }
-
-  private def createPartitionedTable(
-      tableIdent: TableIdentifier,
-      isDatasourceTable: Boolean): Unit = {
-    val catalog = spark.sessionState.catalog
-    val part1 = Map("a" -> "1", "b" -> "q")
-    val part2 = Map("a" -> "2", "b" -> "c")
-    val part3 = Map("a" -> "3", "b" -> "p")
-    createDatabase(catalog, "dbx")
-    createTable(catalog, tableIdent)
-    createTablePartition(catalog, part1, tableIdent)
-    createTablePartition(catalog, part2, tableIdent)
-    createTablePartition(catalog, part3, tableIdent)
-    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
-      Set(part1, part2, part3))
-    if (isDatasourceTable) {
-      convertToDatasourceTable(catalog, tableIdent)
-    }
+    testRenamePartitions(isDatasourceTable = true)
   }
 
   test("show tables") {
@@ -1199,7 +1143,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       if (isDatasourceTable) {
         if (spec.isDefined) {
           assert(storageFormat.properties.isEmpty)
-          assert(storageFormat.locationUri.isEmpty)
+          assert(storageFormat.locationUri === Some(expected))
         } else {
           assert(storageFormat.properties.get("path") === Some(expected))
           assert(storageFormat.locationUri === Some(expected))
@@ -1212,18 +1156,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     sql("ALTER TABLE dbx.tab1 SET LOCATION '/path/to/your/lovely/heart'")
     verifyLocation("/path/to/your/lovely/heart")
     // set table partition location
-    maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='2') SET LOCATION '/path/to/part/ways'")
-    }
+    sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='2') SET LOCATION '/path/to/part/ways'")
     verifyLocation("/path/to/part/ways", Some(partSpec))
     // set table location without explicitly specifying database
     catalog.setCurrentDatabase("dbx")
     sql("ALTER TABLE tab1 SET LOCATION '/swanky/steak/place'")
     verifyLocation("/swanky/steak/place")
     // set table partition location without explicitly specifying database
-    maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE tab1 PARTITION (a='1', b='2') SET LOCATION 'vienna'")
-    }
+    sql("ALTER TABLE tab1 PARTITION (a='1', b='2') SET LOCATION 'vienna'")
     verifyLocation("vienna", Some(partSpec))
     // table to alter does not exist
     intercept[AnalysisException] {
@@ -1354,26 +1294,18 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
 
     // basic add partition
-    maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " +
-        "PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')")
-    }
-    if (!isDatasourceTable) {
-      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
-      assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isEmpty)
-      assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option("paris"))
-      assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isEmpty)
-    }
+    sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " +
+      "PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
+    assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isEmpty)
+    assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option("paris"))
+    assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isEmpty)
 
     // add partitions without explicitly specifying database
     catalog.setCurrentDatabase("dbx")
-    maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
-    }
-    if (!isDatasourceTable) {
-      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
-        Set(part1, part2, part3, part4))
-    }
+    sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+      Set(part1, part2, part3, part4))
 
     // table to alter does not exist
     intercept[AnalysisException] {
@@ -1386,22 +1318,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     }
 
     // partition to add already exists when using IF NOT EXISTS
-    maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
-    }
-    if (!isDatasourceTable) {
-      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
-        Set(part1, part2, part3, part4))
-    }
+    sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+      Set(part1, part2, part3, part4))
 
     // partition spec in ADD PARTITION should be case insensitive by default
-    maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE tab1 ADD PARTITION (A='9', B='9')")
-    }
-    if (!isDatasourceTable) {
-      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
-        Set(part1, part2, part3, part4, part5))
-    }
+    sql("ALTER TABLE tab1 ADD PARTITION (A='9', B='9')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+      Set(part1, part2, part3, part4, part5))
   }
 
   private def testDropPartitions(isDatasourceTable: Boolean): Unit = {
@@ -1424,21 +1348,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     }
 
     // basic drop partition
-    maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (a='4', b='8'), PARTITION (a='3', b='7')")
-    }
-    if (!isDatasourceTable) {
-      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2))
-    }
+    sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (a='4', b='8'), PARTITION (a='3', b='7')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2))
 
     // drop partitions without explicitly specifying database
     catalog.setCurrentDatabase("dbx")
-    maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='2', b ='6')")
-    }
-    if (!isDatasourceTable) {
-      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
-    }
+    sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='2', b ='6')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
 
     // table to alter does not exist
     intercept[AnalysisException] {
@@ -1451,20 +1367,56 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     }
 
     // partition to drop does not exist when using IF EXISTS
-    maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='300')")
-    }
-    if (!isDatasourceTable) {
-      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
-    }
+    sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='300')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
 
     // partition spec in DROP PARTITION should be case insensitive by default
-    maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE tab1 DROP PARTITION (A='1', B='5')")
+    sql("ALTER TABLE tab1 DROP PARTITION (A='1', B='5')")
+    assert(catalog.listPartitions(tableIdent).isEmpty)
+  }
+
+  private def testRenamePartitions(isDatasourceTable: Boolean): Unit = {
+    val catalog = spark.sessionState.catalog
+    val tableIdent = TableIdentifier("tab1", Some("dbx"))
+    val part1 = Map("a" -> "1", "b" -> "q")
+    val part2 = Map("a" -> "2", "b" -> "c")
+    val part3 = Map("a" -> "3", "b" -> "p")
+    createDatabase(catalog, "dbx")
+    createTable(catalog, tableIdent)
+    createTablePartition(catalog, part1, tableIdent)
+    createTablePartition(catalog, part2, tableIdent)
+    createTablePartition(catalog, part3, tableIdent)
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
+    if (isDatasourceTable) {
+      convertToDatasourceTable(catalog, tableIdent)
+    }
+
+    // basic rename partition
+    sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', b='p')")
+    sql("ALTER TABLE dbx.tab1 PARTITION (a='2', b='c') RENAME TO PARTITION (a='20', b='c')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+      Set(Map("a" -> "100", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
+
+    // rename without explicitly specifying database
+    catalog.setCurrentDatabase("dbx")
+    sql("ALTER TABLE tab1 PARTITION (a='100', b='p') RENAME TO PARTITION (a='10', b='p')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+      Set(Map("a" -> "10", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
+
+    // table to alter does not exist
+    intercept[NoSuchTableException] {
+      sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')")
     }
-    if (!isDatasourceTable) {
-      assert(catalog.listPartitions(tableIdent).isEmpty)
+
+    // partition to rename does not exist
+    intercept[NoSuchPartitionException] {
+      sql("ALTER TABLE tab1 PARTITION (a='not_found', b='1') RENAME TO PARTITION (a='1', b='2')")
     }
+
+    // partition spec in RENAME PARTITION should be case insensitive by default
+    sql("ALTER TABLE tab1 PARTITION (A='10', B='p') RENAME TO PARTITION (A='1', B='p')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+      Set(Map("a" -> "1", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
   }
 
   test("drop build-in function") {
@@ -1683,12 +1635,16 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       }
     }
 
-    // truncating partitioned data source tables is not supported
     withTable("rectangles", "rectangles2") {
       data.write.saveAsTable("rectangles")
       data.write.partitionBy("length").saveAsTable("rectangles2")
+
+      // not supported since the table is not partitioned
       assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)")
-      assertUnsupported("TRUNCATE TABLE rectangles2 PARTITION (width=1)")
+
+      // supported since partitions are stored in the metastore
+      sql("TRUNCATE TABLE rectangles2 PARTITION (width=1)")
+      assert(spark.table("rectangles2").collect().isEmpty)
     }
   }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 2003ff42d4f0cde34b190959a86a296fe96382ef..409c316c6802c0ba7a04ae052a080e1c9d54d36f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.command.{ColumnStatStruct, DDLUtils}
 import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
 import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.internal.HiveSerDe
+import org.apache.spark.sql.internal.SQLConf._
 import org.apache.spark.sql.internal.StaticSQLConf._
 import org.apache.spark.sql.types.{DataType, StructField, StructType}
 
@@ -105,13 +106,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
    * metastore.
    */
   private def verifyTableProperties(table: CatalogTable): Unit = {
-    val invalidKeys = table.properties.keys.filter { key =>
-      key.startsWith(DATASOURCE_PREFIX) || key.startsWith(STATISTICS_PREFIX)
-    }
+    val invalidKeys = table.properties.keys.filter(_.startsWith(SPARK_SQL_PREFIX))
     if (invalidKeys.nonEmpty) {
       throw new AnalysisException(s"Cannot persistent ${table.qualifiedName} into hive metastore " +
-        s"as table property keys may not start with '$DATASOURCE_PREFIX' or '$STATISTICS_PREFIX':" +
-        s" ${invalidKeys.mkString("[", ", ", "]")}")
+        s"as table property keys may not start with '$SPARK_SQL_PREFIX': " +
+        invalidKeys.mkString("[", ", ", "]"))
     }
     // External users are not allowed to set/switch the table type. In Hive metastore, the table
     // type can be switched by changing the value of a case-sensitive table property `EXTERNAL`.
@@ -190,11 +189,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       throw new TableAlreadyExistsException(db = db, table = table)
     }
     // Before saving data source table metadata into Hive metastore, we should:
-    //  1. Put table schema, partition column names and bucket specification in table properties.
+    //  1. Put table provider, schema, partition column names, bucket specification and partition
+    //     provider in table properties.
     //  2. Check if this table is hive compatible
     //    2.1  If it's not hive compatible, set schema, partition columns and bucket spec to empty
     //         and save table metadata to Hive.
-    //    2.1  If it's hive compatible, set serde information in table metadata and try to save
+    //    2.2  If it's hive compatible, set serde information in table metadata and try to save
     //         it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
     if (DDLUtils.isDatasourceTable(tableDefinition)) {
       // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
@@ -204,6 +204,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
 
       val tableProperties = new scala.collection.mutable.HashMap[String, String]
       tableProperties.put(DATASOURCE_PROVIDER, provider)
+      if (tableDefinition.partitionProviderIsHive) {
+        tableProperties.put(TABLE_PARTITION_PROVIDER, "hive")
+      }
 
       // Serialized JSON schema string may be too long to be stored into a single metastore table
       // property. In this case, we split the JSON string and store each part as a separate table
@@ -241,12 +244,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
         }
       }
 
-      // converts the table metadata to Spark SQL specific format, i.e. set schema, partition column
-      // names and bucket specification to empty.
+      // converts the table metadata to Spark SQL specific format, i.e. set data schema, names and
+      // bucket specification to empty. Note that partition columns are retained, so that we can
+      // call partition-related Hive API later.
       def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
         tableDefinition.copy(
-          schema = new StructType,
-          partitionColumnNames = Nil,
+          schema = tableDefinition.partitionSchema,
           bucketSpec = None,
           properties = tableDefinition.properties ++ tableProperties)
       }
@@ -419,12 +422,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition,
       // to retain the spark specific format if it is. Also add old data source properties to table
       // properties, to retain the data source table format.
-      val oldDataSourceProps = oldDef.properties.filter(_._1.startsWith(DATASOURCE_PREFIX))
+      val oldDataSourceProps = oldDef.properties.filter(_._1.startsWith(SPARK_SQL_PREFIX))
+      val partitionProviderProp = if (tableDefinition.partitionProviderIsHive) {
+        TABLE_PARTITION_PROVIDER -> "hive"
+      } else {
+        TABLE_PARTITION_PROVIDER -> "builtin"
+      }
       val newDef = withStatsProps.copy(
         schema = oldDef.schema,
         partitionColumnNames = oldDef.partitionColumnNames,
         bucketSpec = oldDef.bucketSpec,
-        properties = oldDataSourceProps ++ withStatsProps.properties)
+        properties = oldDataSourceProps ++ withStatsProps.properties + partitionProviderProp)
 
       client.alterTable(newDef)
     } else {
@@ -448,7 +456,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
    * properties, and filter out these special entries from table properties.
    */
   private def restoreTableMetadata(table: CatalogTable): CatalogTable = {
-    val catalogTable = if (table.tableType == VIEW || conf.get(DEBUG_MODE)) {
+    if (conf.get(DEBUG_MODE)) {
+      return table
+    }
+
+    val tableWithSchema = if (table.tableType == VIEW) {
       table
     } else {
       getProviderFromTableProperties(table).map { provider =>
@@ -473,30 +485,32 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
           provider = Some(provider),
           partitionColumnNames = getPartitionColumnsFromTableProperties(table),
           bucketSpec = getBucketSpecFromTableProperties(table),
-          properties = getOriginalTableProperties(table))
+          partitionProviderIsHive = table.properties.get(TABLE_PARTITION_PROVIDER) == Some("hive"))
       } getOrElse {
-        table.copy(provider = Some("hive"))
+        table.copy(provider = Some("hive"), partitionProviderIsHive = true)
       }
     }
+
     // construct Spark's statistics from information in Hive metastore
-    val statsProps = catalogTable.properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
-    if (statsProps.nonEmpty) {
+    val statsProps = tableWithSchema.properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
+    val tableWithStats = if (statsProps.nonEmpty) {
       val colStatsProps = statsProps.filterKeys(_.startsWith(STATISTICS_COL_STATS_PREFIX))
         .map { case (k, v) => (k.drop(STATISTICS_COL_STATS_PREFIX.length), v) }
-      val colStats: Map[String, ColumnStat] = catalogTable.schema.collect {
+      val colStats: Map[String, ColumnStat] = tableWithSchema.schema.collect {
         case f if colStatsProps.contains(f.name) =>
           val numFields = ColumnStatStruct.numStatFields(f.dataType)
           (f.name, ColumnStat(numFields, colStatsProps(f.name)))
       }.toMap
-      catalogTable.copy(
-        properties = removeStatsProperties(catalogTable),
+      tableWithSchema.copy(
         stats = Some(Statistics(
-          sizeInBytes = BigInt(catalogTable.properties(STATISTICS_TOTAL_SIZE)),
-          rowCount = catalogTable.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
+          sizeInBytes = BigInt(tableWithSchema.properties(STATISTICS_TOTAL_SIZE)),
+          rowCount = tableWithSchema.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
           colStats = colStats)))
     } else {
-      catalogTable
+      tableWithSchema
     }
+
+    tableWithStats.copy(properties = getOriginalTableProperties(table))
   }
 
   override def tableExists(db: String, table: String): Boolean = withClient {
@@ -581,13 +595,30 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   // Partitions
   // --------------------------------------------------------------------------
 
+  // Hive metastore is not case preserving and the partition columns are always lower cased. We need
+  // to lower case the column names in partition specification before calling partition related Hive
+  // APIs, to match this behaviour.
+  private def lowerCasePartitionSpec(spec: TablePartitionSpec): TablePartitionSpec = {
+    spec.map { case (k, v) => k.toLowerCase -> v }
+  }
+
+  // Hive metastore is not case preserving and the column names of the partition specification we
+  // get from the metastore are always lower cased. We should restore them w.r.t. the actual table
+  // partition columns.
+  private def restorePartitionSpec(
+      spec: TablePartitionSpec,
+      partCols: Seq[String]): TablePartitionSpec = {
+    spec.map { case (k, v) => partCols.find(_.equalsIgnoreCase(k)).get -> v }
+  }
+
   override def createPartitions(
       db: String,
       table: String,
       parts: Seq[CatalogTablePartition],
       ignoreIfExists: Boolean): Unit = withClient {
     requireTableExists(db, table)
-    client.createPartitions(db, table, parts, ignoreIfExists)
+    val lowerCasedParts = parts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
+    client.createPartitions(db, table, lowerCasedParts, ignoreIfExists)
   }
 
   override def dropPartitions(
@@ -597,7 +628,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       ignoreIfNotExists: Boolean,
       purge: Boolean): Unit = withClient {
     requireTableExists(db, table)
-    client.dropPartitions(db, table, parts, ignoreIfNotExists, purge)
+    client.dropPartitions(db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge)
   }
 
   override def renamePartitions(
@@ -605,21 +636,24 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       table: String,
       specs: Seq[TablePartitionSpec],
       newSpecs: Seq[TablePartitionSpec]): Unit = withClient {
-    client.renamePartitions(db, table, specs, newSpecs)
+    client.renamePartitions(
+      db, table, specs.map(lowerCasePartitionSpec), newSpecs.map(lowerCasePartitionSpec))
   }
 
   override def alterPartitions(
       db: String,
       table: String,
       newParts: Seq[CatalogTablePartition]): Unit = withClient {
-    client.alterPartitions(db, table, newParts)
+    val lowerCasedParts = newParts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
+    client.alterPartitions(db, table, lowerCasedParts)
   }
 
   override def getPartition(
       db: String,
       table: String,
       spec: TablePartitionSpec): CatalogTablePartition = withClient {
-    client.getPartition(db, table, spec)
+    val part = client.getPartition(db, table, lowerCasePartitionSpec(spec))
+    part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames))
   }
 
   /**
@@ -629,7 +663,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       db: String,
       table: String,
       spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient {
-    client.getPartitionOption(db, table, spec)
+    client.getPartitionOption(db, table, lowerCasePartitionSpec(spec)).map { part =>
+      part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames))
+    }
   }
 
   /**
@@ -639,14 +675,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       db: String,
       table: String,
       partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = withClient {
-    client.getPartitions(db, table, partialSpec)
+    client.getPartitions(db, table, partialSpec.map(lowerCasePartitionSpec)).map { part =>
+      part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames))
+    }
   }
 
   override def listPartitionsByFilter(
       db: String,
       table: String,
       predicates: Seq[Expression]): Seq[CatalogTablePartition] = withClient {
-    val catalogTable = client.getTable(db, table)
+    val rawTable = client.getTable(db, table)
+    val catalogTable = restoreTableMetadata(rawTable)
     val partitionColumnNames = catalogTable.partitionColumnNames.toSet
     val nonPartitionPruningPredicates = predicates.filterNot {
       _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
@@ -660,19 +699,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     val partitionSchema = catalogTable.partitionSchema
 
     if (predicates.nonEmpty) {
-      val clientPrunedPartitions =
-        client.getPartitionsByFilter(catalogTable, predicates)
+      val clientPrunedPartitions = client.getPartitionsByFilter(rawTable, predicates).map { part =>
+        part.copy(spec = restorePartitionSpec(part.spec, catalogTable.partitionColumnNames))
+      }
       val boundPredicate =
         InterpretedPredicate.create(predicates.reduce(And).transform {
           case att: AttributeReference =>
             val index = partitionSchema.indexWhere(_.name == att.name)
             BoundReference(index, partitionSchema(index).dataType, nullable = true)
         })
-      clientPrunedPartitions.filter { case p: CatalogTablePartition =>
-        boundPredicate(p.toRow(partitionSchema))
-      }
+      clientPrunedPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) }
     } else {
-      client.getPartitions(catalogTable)
+      client.getPartitions(catalogTable).map { part =>
+        part.copy(spec = restorePartitionSpec(part.spec, catalogTable.partitionColumnNames))
+      }
     }
   }
 
@@ -722,7 +762,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
 }
 
 object HiveExternalCatalog {
-  val DATASOURCE_PREFIX = "spark.sql.sources."
+  val SPARK_SQL_PREFIX = "spark.sql."
+
+  val DATASOURCE_PREFIX = SPARK_SQL_PREFIX + "sources."
   val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider"
   val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema"
   val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "."
@@ -736,21 +778,20 @@ object HiveExternalCatalog {
   val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol."
   val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol."
 
-  val STATISTICS_PREFIX = "spark.sql.statistics."
+  val STATISTICS_PREFIX = SPARK_SQL_PREFIX + "statistics."
   val STATISTICS_TOTAL_SIZE = STATISTICS_PREFIX + "totalSize"
   val STATISTICS_NUM_ROWS = STATISTICS_PREFIX + "numRows"
   val STATISTICS_COL_STATS_PREFIX = STATISTICS_PREFIX + "colStats."
 
-  def removeStatsProperties(metadata: CatalogTable): Map[String, String] = {
-    metadata.properties.filterNot { case (key, _) => key.startsWith(STATISTICS_PREFIX) }
-  }
+  val TABLE_PARTITION_PROVIDER = SPARK_SQL_PREFIX + "partitionProvider"
+
 
   def getProviderFromTableProperties(metadata: CatalogTable): Option[String] = {
     metadata.properties.get(DATASOURCE_PROVIDER)
   }
 
   def getOriginalTableProperties(metadata: CatalogTable): Map[String, String] = {
-    metadata.properties.filterNot { case (key, _) => key.startsWith(DATASOURCE_PREFIX) }
+    metadata.properties.filterNot { case (key, _) => key.startsWith(SPARK_SQL_PREFIX) }
   }
 
   // A persisted data source table always store its schema in the catalog.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 6c1585d5f5617d52fc2b4742d443537ce0359e7e..d1de863ce3623f59d5b304c31b2926e82f2c194c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -76,11 +76,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
             partitionColumns = table.partitionColumnNames,
             bucketSpec = table.bucketSpec,
             className = table.provider.get,
-            options = table.storage.properties)
+            options = table.storage.properties,
+            catalogTable = Some(table))
 
-        LogicalRelation(
-          dataSource.resolveRelation(),
-          catalogTable = Some(table))
+        LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table))
       }
     }
 
@@ -194,7 +193,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
       QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
     val bucketSpec = None  // We don't support hive bucketed tables, only ones we write out.
 
-    val lazyPruningEnabled = sparkSession.sqlContext.conf.filesourcePartitionPruning
+    val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
     val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
       val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 8835b266b22a41f06d12f8da518b9ba61731debd..84873bbbb81ced19900c54b9bf7624d175abe965 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -777,7 +777,7 @@ private[hive] class HiveClientImpl(
     val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
       table.partitionColumnNames.contains(c.getName)
     }
-    if (table.schema.isEmpty) {
+    if (schema.isEmpty) {
       // This is a hack to preserve existing behavior. Before Spark 2.0, we do not
       // set a default serde here (this was done in Hive), and so if the user provides
       // an empty schema Hive would automatically populate the schema with a single
@@ -831,9 +831,6 @@ private[hive] class HiveClientImpl(
     new HivePartition(ht, tpart)
   }
 
-  // TODO (cloud-fan): the column names in partition specification are always lower cased because
-  // Hive metastore is not case preserving. We should normalize them to the actual column names of
-  // the table, once we store partition spec of data source tables.
   private def fromHivePartition(hp: HivePartition): CatalogTablePartition = {
     val apiPartition = hp.getTPartition
     CatalogTablePartition(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
index d290fe9962db26c6a441b43198f34c7cc1aac863..6e887d95c0f096d2777836390a513a68eed0ba24 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
@@ -63,7 +63,7 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi
 
   def testCaching(pruningEnabled: Boolean): Unit = {
     test(s"partitioned table is cached when partition pruning is $pruningEnabled") {
-      withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> pruningEnabled.toString) {
+      withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> pruningEnabled.toString) {
         withTable("test") {
           withTempDir { dir =>
             spark.range(5).selectExpr("id", "id as f1", "id as f2").write
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..5f16960fb149690d077e9807c43a2d54fe73c761
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+class PartitionProviderCompatibilitySuite
+  extends QueryTest with TestHiveSingleton with SQLTestUtils {
+
+  private def setupPartitionedDatasourceTable(tableName: String, dir: File): Unit = {
+    spark.range(5).selectExpr("id as fieldOne", "id as partCol").write
+      .partitionBy("partCol")
+      .mode("overwrite")
+      .parquet(dir.getAbsolutePath)
+
+    spark.sql(s"""
+      |create table $tableName (fieldOne long, partCol int)
+      |using parquet
+      |options (path "${dir.getAbsolutePath}")
+      |partitioned by (partCol)""".stripMargin)
+  }
+
+  private def verifyIsLegacyTable(tableName: String): Unit = {
+    val unsupportedCommands = Seq(
+      s"ALTER TABLE $tableName ADD PARTITION (partCol=1) LOCATION '/foo'",
+      s"ALTER TABLE $tableName PARTITION (partCol=1) RENAME TO PARTITION (partCol=2)",
+      s"ALTER TABLE $tableName PARTITION (partCol=1) SET LOCATION '/foo'",
+      s"ALTER TABLE $tableName DROP PARTITION (partCol=1)",
+      s"DESCRIBE $tableName PARTITION (partCol=1)",
+      s"SHOW PARTITIONS $tableName")
+
+    withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
+      for (cmd <- unsupportedCommands) {
+        val e = intercept[AnalysisException] {
+          spark.sql(cmd)
+        }
+        assert(e.getMessage.contains("partition metadata is not stored in the Hive metastore"), e)
+      }
+    }
+  }
+
+  test("convert partition provider to hive with repair table") {
+    withTable("test") {
+      withTempDir { dir =>
+        withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "false") {
+          setupPartitionedDatasourceTable("test", dir)
+          assert(spark.sql("select * from test").count() == 5)
+        }
+        withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
+          verifyIsLegacyTable("test")
+          spark.sql("msck repair table test")
+          spark.sql("show partitions test").count()  // check we are a new table
+
+          // sanity check table performance
+          HiveCatalogMetrics.reset()
+          assert(spark.sql("select * from test where partCol < 2").count() == 2)
+          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2)
+        }
+      }
+    }
+  }
+
+  test("when partition management is enabled, new tables have partition provider hive") {
+    withTable("test") {
+      withTempDir { dir =>
+        withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
+          setupPartitionedDatasourceTable("test", dir)
+          spark.sql("show partitions test").count()  // check we are a new table
+          assert(spark.sql("select * from test").count() == 0)  // needs repair
+          spark.sql("msck repair table test")
+          assert(spark.sql("select * from test").count() == 5)
+        }
+      }
+    }
+  }
+
+  test("when partition management is disabled, new tables have no partition provider") {
+    withTable("test") {
+      withTempDir { dir =>
+        withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "false") {
+          setupPartitionedDatasourceTable("test", dir)
+          verifyIsLegacyTable("test")
+          assert(spark.sql("select * from test").count() == 5)
+        }
+      }
+    }
+  }
+
+  test("when partition management is disabled, we preserve the old behavior even for new tables") {
+    withTable("test") {
+      withTempDir { dir =>
+        withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
+          setupPartitionedDatasourceTable("test", dir)
+          spark.sql("show partitions test").count()  // check we are a new table
+          spark.sql("refresh table test")
+          assert(spark.sql("select * from test").count() == 0)
+        }
+        // disabled
+        withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "false") {
+          val e = intercept[AnalysisException] {
+            spark.sql(s"show partitions test")
+          }
+          assert(e.getMessage.contains("filesource partition management is disabled"))
+          spark.sql("refresh table test")
+          assert(spark.sql("select * from test").count() == 5)
+        }
+        // then enabled again
+        withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
+          spark.sql("refresh table test")
+          assert(spark.sql("select * from test").count() == 0)
+        }
+      }
+    }
+  }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
similarity index 68%
rename from sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala
rename to sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index 82ee813c6a95f6799ea74144d6d8b0b925d2185e..476383a5b33a510289d7787beb8b98bcb19e8293 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
 
-class HiveTablePerfStatsSuite
+class PartitionedTablePerfStatsSuite
   extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach {
 
   override def beforeEach(): Unit = {
@@ -41,25 +41,54 @@ class HiveTablePerfStatsSuite
     FileStatusCache.resetForTesting()
   }
 
-  private def setupPartitionedTable(tableName: String, dir: File): Unit = {
-    spark.range(5).selectExpr("id", "id as partCol1", "id as partCol2").write
+  private case class TestSpec(setupTable: (String, File) => Unit, isDatasourceTable: Boolean)
+
+  /**
+   * Runs a test against both converted hive and native datasource tables. The test can use the
+   * passed TestSpec object for setup and inspecting test parameters.
+   */
+  private def genericTest(testName: String)(fn: TestSpec => Unit): Unit = {
+    test("hive table: " + testName) {
+      fn(TestSpec(setupPartitionedHiveTable, false))
+    }
+    test("datasource table: " + testName) {
+      fn(TestSpec(setupPartitionedDatasourceTable, true))
+    }
+  }
+
+  private def setupPartitionedHiveTable(tableName: String, dir: File): Unit = {
+    spark.range(5).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
       .partitionBy("partCol1", "partCol2")
       .mode("overwrite")
       .parquet(dir.getAbsolutePath)
 
     spark.sql(s"""
-      |create external table $tableName (id long)
+      |create external table $tableName (fieldOne long)
       |partitioned by (partCol1 int, partCol2 int)
       |stored as parquet
       |location "${dir.getAbsolutePath}"""".stripMargin)
     spark.sql(s"msck repair table $tableName")
   }
 
-  test("partitioned pruned table reports only selected files") {
+  private def setupPartitionedDatasourceTable(tableName: String, dir: File): Unit = {
+    spark.range(5).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
+      .partitionBy("partCol1", "partCol2")
+      .mode("overwrite")
+      .parquet(dir.getAbsolutePath)
+
+    spark.sql(s"""
+      |create table $tableName (fieldOne long, partCol1 int, partCol2 int)
+      |using parquet
+      |options (path "${dir.getAbsolutePath}")
+      |partitioned by (partCol1, partCol2)""".stripMargin)
+    spark.sql(s"msck repair table $tableName")
+  }
+
+  genericTest("partitioned pruned table reports only selected files") { spec =>
     assert(spark.sqlContext.getConf(HiveUtils.CONVERT_METASTORE_PARQUET.key) == "true")
     withTable("test") {
       withTempDir { dir =>
-        setupPartitionedTable("test", dir)
+        spec.setupTable("test", dir)
         val df = spark.sql("select * from test")
         assert(df.count() == 5)
         assert(df.inputFiles.length == 5)  // unpruned
@@ -75,17 +104,24 @@ class HiveTablePerfStatsSuite
         val df4 = spark.sql("select * from test where partCol1 = 999")
         assert(df4.count() == 0)
         assert(df4.inputFiles.length == 0)
+
+        // TODO(ekl) enable for hive tables as well once SPARK-17983 is fixed
+        if (spec.isDatasourceTable) {
+          val df5 = spark.sql("select * from test where fieldOne = 4")
+          assert(df5.count() == 1)
+          assert(df5.inputFiles.length == 5)
+        }
       }
     }
   }
 
-  test("lazy partition pruning reads only necessary partition data") {
+  genericTest("lazy partition pruning reads only necessary partition data") { spec =>
     withSQLConf(
-        SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true",
+        SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true",
         SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key -> "0") {
       withTable("test") {
         withTempDir { dir =>
-          setupPartitionedTable("test", dir)
+          spec.setupTable("test", dir)
           HiveCatalogMetrics.reset()
           spark.sql("select * from test where partCol1 = 999").count()
           assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
@@ -120,13 +156,13 @@ class HiveTablePerfStatsSuite
     }
   }
 
-  test("lazy partition pruning with file status caching enabled") {
+  genericTest("lazy partition pruning with file status caching enabled") { spec =>
     withSQLConf(
-        "spark.sql.hive.filesourcePartitionPruning" -> "true",
-        "spark.sql.hive.filesourcePartitionFileCacheSize" -> "9999999") {
+        SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true",
+        SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key -> "9999999") {
       withTable("test") {
         withTempDir { dir =>
-          setupPartitionedTable("test", dir)
+          spec.setupTable("test", dir)
           HiveCatalogMetrics.reset()
           assert(spark.sql("select * from test where partCol1 = 999").count() == 0)
           assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
@@ -161,13 +197,13 @@ class HiveTablePerfStatsSuite
     }
   }
 
-  test("file status caching respects refresh table and refreshByPath") {
+  genericTest("file status caching respects refresh table and refreshByPath") { spec =>
     withSQLConf(
-        "spark.sql.hive.filesourcePartitionPruning" -> "true",
-        "spark.sql.hive.filesourcePartitionFileCacheSize" -> "9999999") {
+        SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true",
+        SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key -> "9999999") {
       withTable("test") {
         withTempDir { dir =>
-          setupPartitionedTable("test", dir)
+          spec.setupTable("test", dir)
           HiveCatalogMetrics.reset()
           assert(spark.sql("select * from test").count() == 5)
           assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
@@ -190,13 +226,13 @@ class HiveTablePerfStatsSuite
     }
   }
 
-  test("file status cache respects size limit") {
+  genericTest("file status cache respects size limit") { spec =>
     withSQLConf(
-        "spark.sql.hive.filesourcePartitionPruning" -> "true",
-        "spark.sql.hive.filesourcePartitionFileCacheSize" -> "1" /* 1 byte */) {
+        SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true",
+        SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key -> "1" /* 1 byte */) {
       withTable("test") {
         withTempDir { dir =>
-          setupPartitionedTable("test", dir)
+          spec.setupTable("test", dir)
           HiveCatalogMetrics.reset()
           assert(spark.sql("select * from test").count() == 5)
           assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
@@ -209,11 +245,11 @@ class HiveTablePerfStatsSuite
     }
   }
 
-  test("all partitions read and cached when filesource partition pruning is off") {
-    withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "false") {
+  test("hive table: files read and cached when filesource partition management is off") {
+    withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "false") {
       withTable("test") {
         withTempDir { dir =>
-          setupPartitionedTable("test", dir)
+          setupPartitionedHiveTable("test", dir)
 
           // We actually query the partitions from hive each time the table is resolved in this
           // mode. This is kind of terrible, but is needed to preserve the legacy behavior
@@ -237,4 +273,32 @@ class HiveTablePerfStatsSuite
       }
     }
   }
+
+  test("datasource table: all partition data cached in memory when partition management is off") {
+    withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "false") {
+      withTable("test") {
+        withTempDir { dir =>
+          setupPartitionedDatasourceTable("test", dir)
+          HiveCatalogMetrics.reset()
+          assert(spark.sql("select * from test where partCol1 = 999").count() == 0)
+
+          // not using metastore
+          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
+
+          // reads and caches all the files initially
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+
+          HiveCatalogMetrics.reset()
+          assert(spark.sql("select * from test where partCol1 < 2").count() == 2)
+          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+
+          HiveCatalogMetrics.reset()
+          assert(spark.sql("select * from test").count() == 5)
+          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+        }
+      }
+    }
+  }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index c351063a63ff8a0e587e3948a7841681674f88d8..4f5ebc3d838b9d1dd7e76b51d22abb58ec7da130 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -310,39 +310,50 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
     }
   }
 
-  test("test table-level statistics for data source table created in HiveExternalCatalog") {
-    val parquetTable = "parquetTable"
-    withTable(parquetTable) {
-      sql(s"CREATE TABLE $parquetTable (key STRING, value STRING) USING PARQUET")
-      val catalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier(parquetTable))
-      assert(DDLUtils.isDatasourceTable(catalogTable))
+  private def testUpdatingTableStats(tableDescription: String, createTableCmd: String): Unit = {
+    test("test table-level statistics for " + tableDescription) {
+      val parquetTable = "parquetTable"
+      withTable(parquetTable) {
+        sql(createTableCmd)
+        val catalogTable = spark.sessionState.catalog.getTableMetadata(
+          TableIdentifier(parquetTable))
+        assert(DDLUtils.isDatasourceTable(catalogTable))
+
+        sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src")
+        checkTableStats(
+          parquetTable, isDataSourceTable = true, hasSizeInBytes = false, expectedRowCounts = None)
 
-      sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src")
-      checkTableStats(
-        parquetTable, isDataSourceTable = true, hasSizeInBytes = false, expectedRowCounts = None)
+        // noscan won't count the number of rows
+        sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan")
+        val fetchedStats1 = checkTableStats(
+          parquetTable, isDataSourceTable = true, hasSizeInBytes = true, expectedRowCounts = None)
 
-      // noscan won't count the number of rows
-      sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan")
-      val fetchedStats1 = checkTableStats(
-        parquetTable, isDataSourceTable = true, hasSizeInBytes = true, expectedRowCounts = None)
+        sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src")
+        sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan")
+        val fetchedStats2 = checkTableStats(
+          parquetTable, isDataSourceTable = true, hasSizeInBytes = true, expectedRowCounts = None)
+        assert(fetchedStats2.get.sizeInBytes > fetchedStats1.get.sizeInBytes)
 
-      sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src")
-      sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan")
-      val fetchedStats2 = checkTableStats(
-        parquetTable, isDataSourceTable = true, hasSizeInBytes = true, expectedRowCounts = None)
-      assert(fetchedStats2.get.sizeInBytes > fetchedStats1.get.sizeInBytes)
-
-      // without noscan, we count the number of rows
-      sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS")
-      val fetchedStats3 = checkTableStats(
-        parquetTable,
-        isDataSourceTable = true,
-        hasSizeInBytes = true,
-        expectedRowCounts = Some(1000))
-      assert(fetchedStats3.get.sizeInBytes == fetchedStats2.get.sizeInBytes)
+        // without noscan, we count the number of rows
+        sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS")
+        val fetchedStats3 = checkTableStats(
+          parquetTable,
+          isDataSourceTable = true,
+          hasSizeInBytes = true,
+          expectedRowCounts = Some(1000))
+        assert(fetchedStats3.get.sizeInBytes == fetchedStats2.get.sizeInBytes)
+      }
     }
   }
 
+  testUpdatingTableStats(
+    "data source table created in HiveExternalCatalog",
+    "CREATE TABLE parquetTable (key STRING, value STRING) USING PARQUET")
+
+  testUpdatingTableStats(
+    "partitioned data source table",
+    "CREATE TABLE parquetTable (key STRING, value STRING) USING PARQUET PARTITIONED BY (key)")
+
   test("statistics collection of a table with zero column") {
     val table_no_cols = "table_no_cols"
     withTable(table_no_cols) {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
index ad1e9b17a9f71f4613f0ffda82c544bc68d70a6c..46ed18c70fb56b7645211bd1792b573a2a8b4388 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
@@ -415,10 +415,7 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
         .mode(SaveMode.Overwrite)
         .saveAsTable("part_datasrc")
 
-      val message1 = intercept[AnalysisException] {
-        sql("SHOW PARTITIONS part_datasrc")
-      }.getMessage
-      assert(message1.contains("is not allowed on a datasource table"))
+      assert(sql("SHOW PARTITIONS part_datasrc").count() == 3)
     }
   }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 495b4f874a1d615f0487c2c4a996843573a54148..01fa827220c519164686cbafd74cd9fea0093c40 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -358,7 +358,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
         "# Partition Information",
         "# col_name",
         "Detailed Partition Information CatalogPartition(",
-        "Partition Values: [Us, 1]",
+        "Partition Values: [c=Us, d=1]",
         "Storage(Location:",
         "Partition Parameters")
 
@@ -399,10 +399,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
         .range(1).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd).write
         .partitionBy("d")
         .saveAsTable("datasource_table")
-      val m4 = intercept[AnalysisException] {
-        sql("DESC datasource_table PARTITION (d=2)")
-      }.getMessage()
-      assert(m4.contains("DESC PARTITION is not allowed on a datasource table"))
+
+      sql("DESC datasource_table PARTITION (d=0)")
 
       val m5 = intercept[AnalysisException] {
         spark.range(10).select('id as 'a, 'id as 'b).createTempView("view1")