        // This table fetch is to fill the cache with zero leaf files

             |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
             |INTO TABLE tab


In the above example, the returned result is empty after table loading. The metadata cache could be out of dated after loading new data into the table, because loading/inserting does not update the cache. So far, the metadata cache is only used for data source tables. Thus, for Hive serde tables, only `parquet` and `orc` formats are facing such issues, because the Hive serde tables in the format of  parquet/orc could be converted to data source tables when `spark.sql.hive.convertMetastoreParquet`/`spark.sql.hive.convertMetastoreOrc` is on.

This PR is to refresh the metadata cache after processing the `LOAD DATA` command.

In addition, Spark SQL does not convert **partitioned** Hive tables (orc/parquet) to data source tables in the write path, but the read path is using the metadata cache for both **partitioned** and non-partitioned Hive tables (orc/parquet). That means, writing the partitioned parquet/orc tables still use `InsertIntoHiveTable`, instead of `InsertIntoHadoopFsRelationCommand`. To avoid reading the out-of-dated cache, `InsertIntoHiveTable` needs to refresh the metadata cache for partitioned tables. Note, it does not need to refresh the cache for non-partitioned parquet/orc tables, because it does not call `InsertIntoHiveTable` at all. Based on the comments, this PR will keep the existing logics unchanged. That means, we always refresh the table no matter whether the table is partitioned or not.

Added test cases in parquetSuites.scala

         holdDDLTime = false)
+    // Refresh the metadata cache to ensure the data visible to the users
+    catalog.refreshTable(targetTable.identifier)
         val logicalRelation = cached.getOrElse {
           val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong
-          val fileCatalog = {
-            val catalog = new CatalogFileIndex(
+          val fileIndex = {
+            val index = new CatalogFileIndex(
               sparkSession, metastoreRelation.catalogTable, sizeInBytes)
             if (lazyPruningEnabled) {
-              catalog
+              index
             } else {
-              catalog.filterPartitions(Nil)  // materialize all the partitions in memory
+              index.filterPartitions(Nil)  // materialize all the partitions in memory
           val partitionSchemaColumnNames =
@@ -248,7 +248,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
               .filterNot(field => partitionSchemaColumnNames.contains(
           val relation = HadoopFsRelation(
-            location = fileCatalog,
+            location = fileIndex,
             partitionSchema = partitionSchema,
             dataSchema = dataSchema,
             bucketSpec = bucketSpec,
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation}
-import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hive.execution.HiveTableScanExec
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
-       "test_parquet")
+      "test_parquet")
+    super.afterAll()
   test(s"conversion is working") {
           sql("SELECT * FROM test_added_partitions"),
-          Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
+          Seq(Row("foo", 0), Row("bar", 0)))
         // Create partition without data files and check whether it can be read
         sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1') LOCATION '$partitionDir'")
           sql("SELECT * FROM test_added_partitions"),
-          Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
+          Seq(Row("foo", 0), Row("bar", 0)))
         // Add data files to partition directory and check whether they can be read
         sql("INSERT INTO TABLE test_added_partitions PARTITION (b=1) select 'baz' as a")
           sql("SELECT * FROM test_added_partitions"),
-          Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b"))
+          Seq(Row("foo", 0), Row("bar", 0), Row("baz", 1)))
         // Check it with pruning predicates
           sql("SELECT * FROM test_added_partitions where b = 0"),
-          Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
+          Seq(Row("foo", 0), Row("bar", 0)))
           sql("SELECT * FROM test_added_partitions where b = 1"),
-          Seq(("baz", 1)).toDF("a", "b"))
+          Seq(Row("baz", 1)))
           sql("SELECT * FROM test_added_partitions where b = 2"),
-          Seq[(String, Int)]().toDF("a", "b"))
+          Seq.empty)
         // Also verify the inputFiles implementation
         assert(sql("select * from test_added_partitions").inputFiles.length == 2)
+  test("Explicitly added partitions should be readable after load") {
+    withTable("test_added_partitions") {
+      withTempDir { src =>
+        val newPartitionDir = src.getCanonicalPath
+        spark.range(2).selectExpr("cast(id as string)").toDF("a").write
+          .mode("overwrite")
+          .parquet(newPartitionDir)
+        sql(
+          """
+            |CREATE TABLE test_added_partitions (a STRING)
+            |PARTITIONED BY (b INT)
+            |STORED AS PARQUET
+          """.stripMargin)
+        // Create partition without data files and check whether it can be read
+        sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1')")
+        // This table fetch is to fill the cache with zero leaf files
+        checkAnswer(spark.table("test_added_partitions"), Seq.empty)
+        sql(
+          s"""
+             |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
+             |INTO TABLE test_added_partitions PARTITION(b='1')
+           """.stripMargin)
+        checkAnswer(
+          spark.table("test_added_partitions"),
+          Seq(Row("0", 1), Row("1", 1)))
+      }
+    }
+  }
+  test("Non-partitioned table readable after load") {
+    withTable("tab") {
+      withTempDir { src =>
+        val newPartitionDir = src.getCanonicalPath
+        spark.range(2).selectExpr("cast(id as string)").toDF("a").write
+          .mode("overwrite")
+          .parquet(newPartitionDir)
+        // This table fetch is to fill the cache with zero leaf files
+        checkAnswer(spark.table("tab"), Seq.empty)
+        sql(
+          s"""
+             |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
+             |INTO TABLE tab
+           """.stripMargin)
+        checkAnswer(spark.table("tab"), Seq(Row("0"), Row("1")))
+      }
+    }
+  }
   test("self-join") {
     val table = spark.table("normal_parquet")
     val selfJoin ="t1").crossJoin("t2"))