diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 49c07427617f3fca0b23cb850bd29edc678526ff..a99bc3bff6eeaf6ca63835bb03802bddff67d1ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -211,4 +211,17 @@ abstract class Catalog {
    */
   def clearCache(): Unit
 
+  /**
+   * Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
+   * Spark SQL or the external data source library it uses might cache certain metadata about a
+   * table, such as the location of blocks. When those change outside of Spark SQL, users should
+   * call this function to invalidate the cache.
+   *
+   * If this table is cached as an InMemoryRelation, drop the original cached version and make the
+   * new version cached lazily.
+   *
+   * @since 2.0.0
+   */
+  def refreshTable(tableName: String): Unit
+
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 68238dbb46e9f034a5c1312f39d6dc7a90177104..78b1db16826e23d5698fa144451718eda26e715b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -126,24 +126,9 @@ case class RefreshTable(tableIdent: TableIdentifier)
   extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    // Refresh the given table's metadata first.
-    sparkSession.sessionState.catalog.refreshTable(tableIdent)
-
-    // If this table is cached as a InMemoryColumnarRelation, drop the original
-    // cached version and make the new version cached lazily.
-    val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent)
-    // Use lookupCachedData directly since RefreshTable also takes databaseName.
-    val isCached = sparkSession.cacheManager.lookupCachedData(logicalPlan).nonEmpty
-    if (isCached) {
-      // Create a data frame to represent the table.
-      // TODO: Use uncacheTable once it supports database name.
-      val df = Dataset.ofRows(sparkSession, logicalPlan)
-      // Uncache the logicalPlan.
-      sparkSession.cacheManager.tryUncacheQuery(df, blocking = true)
-      // Cache it again.
-      sparkSession.cacheManager.cacheQuery(df, Some(tableIdent.table))
-    }
-
+    // Refresh the given table's metadata. If this table is cached as an InMemoryRelation,
+    // drop the original cached version and make the new version cached lazily.
+    sparkSession.catalog.refreshTable(tableIdent.quotedString)
     Seq.empty[Row]
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 473e827f4db18b66cd2296c99345e540226873ef..1371abe189f84bfd7169aad7a5fd4f9732548edb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -345,6 +345,33 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
     sparkSession.cacheManager.lookupCachedData(qName).nonEmpty
   }
 
+  /**
+   * Refresh the cache entry for a table, if any. For Hive metastore table, the metadata
+   * is refreshed.
+   *
+   * @group cachemgmt
+   * @since 2.0.0
+   */
+  override def refreshTable(tableName: String): Unit = {
+    val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+    sessionCatalog.refreshTable(tableIdent)
+
+    // If this table is cached as a InMemoryRelation, drop the original
+    // cached version and make the new version cached lazily.
+    val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent)
+    // Use lookupCachedData directly since RefreshTable also takes databaseName.
+    val isCached = sparkSession.cacheManager.lookupCachedData(logicalPlan).nonEmpty
+    if (isCached) {
+      // Create a data frame to represent the table.
+      // TODO: Use uncacheTable once it supports database name.
+      val df = Dataset.ofRows(sparkSession, logicalPlan)
+      // Uncache the logicalPlan.
+      sparkSession.cacheManager.tryUncacheQuery(df, blocking = true)
+      // Cache it again.
+      sparkSession.cacheManager.cacheQuery(df, Some(tableIdent.table))
+    }
+  }
+
 }
 
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index f0b8a83dee8ca834b39c576cea19f9fc175a37c9..8f7c6f5d0ca44917ad96d48739e9e40641b561c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -162,10 +162,6 @@ private[sql] class SessionState(sparkSession: SparkSession) {
 
   def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(sparkSession, plan)
 
-  def refreshTable(tableName: String): Unit = {
-    catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
-  }
-
   def invalidateTable(tableName: String): Unit = {
     catalog.invalidateTable(sqlParser.parseTableIdentifier(tableName))
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 00adb9a44b14e33b3a2f34fe5fa7d5e840d834f5..686c63065dfcca6ff8e75ba914961fe45ad19e2f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -622,7 +622,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         .mode(SaveMode.Append)
         .saveAsTable("arrayInParquet")
 
-      sessionState.refreshTable("arrayInParquet")
+      sparkSession.catalog.refreshTable("arrayInParquet")
 
       checkAnswer(
         sql("SELECT a FROM arrayInParquet"),
@@ -681,7 +681,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         .mode(SaveMode.Append)
         .saveAsTable("mapInParquet")
 
-      sessionState.refreshTable("mapInParquet")
+      sparkSession.catalog.refreshTable("mapInParquet")
 
       checkAnswer(
         sql("SELECT a FROM mapInParquet"),
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
index 622b043581c5d606ae408e08d44ea6c868b4ce26..5b706b0432418e006b10c160289a6eb8a80eaff6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
@@ -217,7 +217,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
 
           df.write.parquet(s"$path/p=2")
           sql("ALTER TABLE t ADD PARTITION (p=2)")
-          hiveContext.sessionState.refreshTable("t")
+          spark.catalog.refreshTable("t")
           checkAnswer(
             spark.table("t"),
             df.withColumn("p", lit(1)).union(df.withColumn("p", lit(2))))
@@ -249,7 +249,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
 
         df.write.parquet(s"$path/p=2")
         sql(s"ALTER TABLE $db.t ADD PARTITION (p=2)")
-        hiveContext.sessionState.refreshTable(s"$db.t")
+        spark.catalog.refreshTable(s"$db.t")
         checkAnswer(
           spark.table(s"$db.t"),
           df.withColumn("p", lit(1)).union(df.withColumn("p", lit(2))))
diff --git a/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 75166f6beaa8798c9e99e3112b796cd1d7e31a23..415d4c0049d4076a32f75eaff51c1bdd6af2b07a 100644
--- a/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -58,4 +58,16 @@ class HiveContext private[hive](
     sparkSession.sharedState.asInstanceOf[HiveSharedState]
   }
 
+  /**
+   * Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
+   * Spark SQL or the external data source library it uses might cache certain metadata about a
+   * table, such as the location of blocks. When those change outside of Spark SQL, users should
+   * call this function to invalidate the cache.
+   *
+   * @since 1.3.0
+   */
+  def refreshTable(tableName: String): Unit = {
+    sparkSession.catalog.refreshTable(tableName)
+  }
+
 }