diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 422700c89194a35eff66cce833db9b5bd206700c..193a2a2cdc1708e7a8af6fff5ee2b8fa4c89e80d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -58,13 +58,21 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
     // Create the relation to validate the arguments before writing the metadata to the metastore,
     // and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
     val pathOption = table.storage.locationUri.map("path" -> _)
+    // Fill in some default table options from the session conf
+    val tableWithDefaultOptions = table.copy(
+      identifier = table.identifier.copy(
+        database = Some(
+          table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase))),
+      tracksPartitionsInCatalog = sparkSession.sessionState.conf.manageFilesourcePartitions)
     val dataSource: BaseRelation =
       DataSource(
         sparkSession = sparkSession,
         userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
+        partitionColumns = table.partitionColumnNames,
         className = table.provider.get,
         bucketSpec = table.bucketSpec,
-        options = table.storage.properties ++ pathOption).resolveRelation()
+        options = table.storage.properties ++ pathOption,
+        catalogTable = Some(tableWithDefaultOptions)).resolveRelation()
 
     dataSource match {
       case fs: HadoopFsRelation =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index ccfc759c8fa7ead98cca78c09f914e47b816a9f8..f47eb84df02881753fd0d5d87b5b604b26bfdccc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -132,7 +132,7 @@ case class DataSource(
       }.toArray
       new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
     }
-    val partitionSchema = if (partitionColumns.isEmpty && catalogTable.isEmpty) {
+    val partitionSchema = if (partitionColumns.isEmpty) {
       // Try to infer partitioning, because no DataSource in the read path provides the partitioning
       // columns properly unless it is a Hive DataSource
       val resolved = tempFileIndex.partitionSchema.map { partitionField =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 2a004ba2f12d5ff4a36cd1130c02af23e2d441df..e61beb49e47f4a53fc0b34714dc2e95a476a41da 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -312,7 +312,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
           pathToNonPartitionedTable,
           userSpecifiedSchema = Option("num int, str string"),
           userSpecifiedPartitionCols = partitionCols,
-          expectedSchema = new StructType().add("num", IntegerType).add("str", StringType),
+          expectedSchema = if (partitionCols.isDefined) {
+            // we skipped inference, so the partition col is ordered at the end
+            new StructType().add("str", StringType).add("num", IntegerType)
+          } else {
+            // no inferred partitioning, so schema is in original order
+            new StructType().add("num", IntegerType).add("str", StringType)
+          },
           expectedPartitionCols = partitionCols.map(Seq(_)).getOrElse(Seq.empty[String]))
       }
     }
@@ -565,7 +571,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       val table = catalog.getTableMetadata(TableIdentifier("tbl"))
       assert(table.tableType == CatalogTableType.MANAGED)
       assert(table.provider == Some("parquet"))
-      assert(table.schema == new StructType().add("a", IntegerType).add("b", IntegerType))
+      // a is ordered last since it is a user-specified partitioning column
+      assert(table.schema == new StructType().add("b", IntegerType).add("a", IntegerType))
       assert(table.partitionColumnNames == Seq("a"))
     }
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index 9838b9a4eba3dbcb8d66d82f8380bf04a06ec8ab..65c02d473b79d2dad1d35171dbd7ff045ccccb7e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -60,36 +60,52 @@ class PartitionedTablePerfStatsSuite
     setupPartitionedHiveTable(tableName, dir, 5)
   }
 
-  private def setupPartitionedHiveTable(tableName: String, dir: File, scale: Int): Unit = {
+  private def setupPartitionedHiveTable(
+      tableName: String, dir: File, scale: Int,
+      clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = {
     spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
       .partitionBy("partCol1", "partCol2")
       .mode("overwrite")
       .parquet(dir.getAbsolutePath)
 
+    if (clearMetricsBeforeCreate) {
+      HiveCatalogMetrics.reset()
+    }
+
     spark.sql(s"""
       |create external table $tableName (fieldOne long)
       |partitioned by (partCol1 int, partCol2 int)
       |stored as parquet
       |location "${dir.getAbsolutePath}"""".stripMargin)
-    spark.sql(s"msck repair table $tableName")
+    if (repair) {
+      spark.sql(s"msck repair table $tableName")
+    }
   }
 
   private def setupPartitionedDatasourceTable(tableName: String, dir: File): Unit = {
     setupPartitionedDatasourceTable(tableName, dir, 5)
   }
 
-  private def setupPartitionedDatasourceTable(tableName: String, dir: File, scale: Int): Unit = {
+  private def setupPartitionedDatasourceTable(
+      tableName: String, dir: File, scale: Int,
+      clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = {
     spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
       .partitionBy("partCol1", "partCol2")
       .mode("overwrite")
       .parquet(dir.getAbsolutePath)
 
+    if (clearMetricsBeforeCreate) {
+      HiveCatalogMetrics.reset()
+    }
+
     spark.sql(s"""
       |create table $tableName (fieldOne long, partCol1 int, partCol2 int)
       |using parquet
       |options (path "${dir.getAbsolutePath}")
       |partitioned by (partCol1, partCol2)""".stripMargin)
-    spark.sql(s"msck repair table $tableName")
+    if (repair) {
+      spark.sql(s"msck repair table $tableName")
+    }
   }
 
   genericTest("partitioned pruned table reports only selected files") { spec =>
@@ -250,6 +266,33 @@ class PartitionedTablePerfStatsSuite
     }
   }
 
+  test("datasource table: table setup does not scan filesystem") {
+    withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
+      withTable("test") {
+        withTempDir { dir =>
+          setupPartitionedDatasourceTable(
+            "test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+        }
+      }
+    }
+  }
+
+  test("hive table: table setup does not scan filesystem") {
+    withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
+      withTable("test") {
+        withTempDir { dir =>
+          HiveCatalogMetrics.reset()
+          setupPartitionedHiveTable(
+            "test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+        }
+      }
+    }
+  }
+
   test("hive table: num hive client calls does not scale with partition count") {
     withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
       withTable("test") {