diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index ac6c3a89dbd086593cd869bcd4e8bf681b89654b..246894813c3b9f3137333c5f830f8229d51550b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -317,6 +317,10 @@ case class LoadDataCommand(
         holdDDLTime = false,
         isSrcLocal = isLocal)
     }
+
+    // Refresh the metadata cache to ensure the data visible to the users
+    catalog.refreshTable(targetTable.identifier)
+
     Seq.empty[Row]
   }
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 0c110d350054016f354a9fa731711720a5e2fa31..e4b1f6ae3e49e021041c798367b8e64705bdc7fa 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -246,13 +246,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
         val logicalRelation = cached.getOrElse {
           val sizeInBytes =
             metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong
-          val fileCatalog = {
-            val catalog = new CatalogFileIndex(
+          val fileIndex = {
+            val index = new CatalogFileIndex(
               sparkSession, metastoreRelation.catalogTable, sizeInBytes)
             if (lazyPruningEnabled) {
-              catalog
+              index
             } else {
-              catalog.filterPartitions(Nil)  // materialize all the partitions in memory
+              index.filterPartitions(Nil)  // materialize all the partitions in memory
             }
           }
           val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet
@@ -261,7 +261,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
               .filterNot(field => partitionSchemaColumnNames.contains(field.name.toLowerCase)))
 
           val relation = HadoopFsRelation(
-            location = fileCatalog,
+            location = fileIndex,
             partitionSchema = partitionSchema,
             dataSchema = dataSchema,
             bucketSpec = bucketSpec,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index d3e04d4bf2e0efa145a5113da22187a5540d3bfb..aa4a150a4b807ba6e41ba79ce47d7b39d1ae1ffa 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -23,8 +23,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.command.ExecutedCommandExec
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation}
-import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hive.execution.HiveTableScanExec
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
@@ -187,7 +186,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
       "normal_parquet",
       "jt",
       "jt_array",
-       "test_parquet")
+      "test_parquet")
+    super.afterAll()
   }
 
   test(s"conversion is working") {
@@ -575,30 +575,30 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
 
         checkAnswer(
           sql("SELECT * FROM test_added_partitions"),
-          Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
+          Seq(Row("foo", 0), Row("bar", 0)))
 
         // Create partition without data files and check whether it can be read
         sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1') LOCATION '$partitionDir'")
         checkAnswer(
           sql("SELECT * FROM test_added_partitions"),
-          Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
+          Seq(Row("foo", 0), Row("bar", 0)))
 
         // Add data files to partition directory and check whether they can be read
         sql("INSERT INTO TABLE test_added_partitions PARTITION (b=1) select 'baz' as a")
         checkAnswer(
           sql("SELECT * FROM test_added_partitions"),
-          Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b"))
+          Seq(Row("foo", 0), Row("bar", 0), Row("baz", 1)))
 
         // Check it with pruning predicates
         checkAnswer(
           sql("SELECT * FROM test_added_partitions where b = 0"),
-          Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
+          Seq(Row("foo", 0), Row("bar", 0)))
         checkAnswer(
           sql("SELECT * FROM test_added_partitions where b = 1"),
-          Seq(("baz", 1)).toDF("a", "b"))
+          Seq(Row("baz", 1)))
         checkAnswer(
           sql("SELECT * FROM test_added_partitions where b = 2"),
-          Seq[(String, Int)]().toDF("a", "b"))
+          Seq.empty)
 
         // Also verify the inputFiles implementation
         assert(sql("select * from test_added_partitions").inputFiles.length == 2)
@@ -609,6 +609,63 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
     }
   }
 
+  test("Explicitly added partitions should be readable after load") {
+    withTable("test_added_partitions") {
+      withTempDir { src =>
+        val newPartitionDir = src.getCanonicalPath
+        spark.range(2).selectExpr("cast(id as string)").toDF("a").write
+          .mode("overwrite")
+          .parquet(newPartitionDir)
+
+        sql(
+          """
+            |CREATE TABLE test_added_partitions (a STRING)
+            |PARTITIONED BY (b INT)
+            |STORED AS PARQUET
+          """.stripMargin)
+
+        // Create partition without data files and check whether it can be read
+        sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1')")
+        // This table fetch is to fill the cache with zero leaf files
+        checkAnswer(spark.table("test_added_partitions"), Seq.empty)
+
+        sql(
+          s"""
+             |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
+             |INTO TABLE test_added_partitions PARTITION(b='1')
+           """.stripMargin)
+
+        checkAnswer(
+          spark.table("test_added_partitions"),
+          Seq(Row("0", 1), Row("1", 1)))
+      }
+    }
+  }
+
+  test("Non-partitioned table readable after load") {
+    withTable("tab") {
+      withTempDir { src =>
+        val newPartitionDir = src.getCanonicalPath
+        spark.range(2).selectExpr("cast(id as string)").toDF("a").write
+          .mode("overwrite")
+          .parquet(newPartitionDir)
+
+        sql("CREATE TABLE tab (a STRING) STORED AS PARQUET")
+
+        // This table fetch is to fill the cache with zero leaf files
+        checkAnswer(spark.table("tab"), Seq.empty)
+
+        sql(
+          s"""
+             |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
+             |INTO TABLE tab
+           """.stripMargin)
+
+        checkAnswer(spark.table("tab"), Seq(Row("0"), Row("1")))
+      }
+    }
+  }
+
   test("self-join") {
     val table = spark.table("normal_parquet")
     val selfJoin = table.as("t1").crossJoin(table.as("t2"))