From 39fd469078271aa12f3163606000e06e382d35dc Mon Sep 17 00:00:00 2001 From: gatorsmile <gatorsmile@gmail.com> Date: Fri, 20 May 2016 14:38:25 +0800 Subject: [PATCH] [SPARK-15367][SQL] Add refreshTable back #### What changes were proposed in this pull request? `refreshTable` was a method in `HiveContext`. It was deleted accidentally while we were migrating the APIs. This PR is to add it back to `HiveContext`. In addition, in `SparkSession`, we put it under the catalog namespace (`SparkSession.catalog.refreshTable`). #### How was this patch tested? Changed the existing test cases to use the function `refreshTable`. Also added a test case for refreshTable in `hivecontext-compatibility` Author: gatorsmile <gatorsmile@gmail.com> Closes #13156 from gatorsmile/refreshTable. --- .../apache/spark/sql/catalog/Catalog.scala | 13 +++++++++ .../spark/sql/execution/datasources/ddl.scala | 21 +++------------ .../spark/sql/internal/CatalogImpl.scala | 27 +++++++++++++++++++ .../spark/sql/internal/SessionState.scala | 4 --- .../sql/hive/MetastoreDataSourcesSuite.scala | 4 +-- .../spark/sql/hive/MultiDatabaseSuite.scala | 4 +-- .../apache/spark/sql/hive/HiveContext.scala | 12 +++++++++ 7 files changed, 59 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 49c0742761..a99bc3bff6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -211,4 +211,17 @@ abstract class Catalog { */ def clearCache(): Unit + /** + * Invalidate and refresh all the cached the metadata of the given table. For performance reasons, + * Spark SQL or the external data source library it uses might cache certain metadata about a + * table, such as the location of blocks. When those change outside of Spark SQL, users should + * call this function to invalidate the cache. + * + * If this table is cached as an InMemoryRelation, drop the original cached version and make the + * new version cached lazily. + * + * @since 2.0.0 + */ + def refreshTable(tableName: String): Unit + } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 68238dbb46..78b1db1682 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -126,24 +126,9 @@ case class RefreshTable(tableIdent: TableIdentifier) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { - // Refresh the given table's metadata first. - sparkSession.sessionState.catalog.refreshTable(tableIdent) - - // If this table is cached as a InMemoryColumnarRelation, drop the original - // cached version and make the new version cached lazily. - val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent) - // Use lookupCachedData directly since RefreshTable also takes databaseName. - val isCached = sparkSession.cacheManager.lookupCachedData(logicalPlan).nonEmpty - if (isCached) { - // Create a data frame to represent the table. - // TODO: Use uncacheTable once it supports database name. - val df = Dataset.ofRows(sparkSession, logicalPlan) - // Uncache the logicalPlan. - sparkSession.cacheManager.tryUncacheQuery(df, blocking = true) - // Cache it again. - sparkSession.cacheManager.cacheQuery(df, Some(tableIdent.table)) - } - + // Refresh the given table's metadata. If this table is cached as an InMemoryRelation, + // drop the original cached version and make the new version cached lazily. + sparkSession.catalog.refreshTable(tableIdent.quotedString) Seq.empty[Row] } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 473e827f4d..1371abe189 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -345,6 +345,33 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { sparkSession.cacheManager.lookupCachedData(qName).nonEmpty } + /** + * Refresh the cache entry for a table, if any. For Hive metastore table, the metadata + * is refreshed. + * + * @group cachemgmt + * @since 2.0.0 + */ + override def refreshTable(tableName: String): Unit = { + val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) + sessionCatalog.refreshTable(tableIdent) + + // If this table is cached as a InMemoryRelation, drop the original + // cached version and make the new version cached lazily. + val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent) + // Use lookupCachedData directly since RefreshTable also takes databaseName. + val isCached = sparkSession.cacheManager.lookupCachedData(logicalPlan).nonEmpty + if (isCached) { + // Create a data frame to represent the table. + // TODO: Use uncacheTable once it supports database name. + val df = Dataset.ofRows(sparkSession, logicalPlan) + // Uncache the logicalPlan. + sparkSession.cacheManager.tryUncacheQuery(df, blocking = true) + // Cache it again. + sparkSession.cacheManager.cacheQuery(df, Some(tableIdent.table)) + } + } + } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index f0b8a83dee..8f7c6f5d0c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -162,10 +162,6 @@ private[sql] class SessionState(sparkSession: SparkSession) { def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(sparkSession, plan) - def refreshTable(tableName: String): Unit = { - catalog.refreshTable(sqlParser.parseTableIdentifier(tableName)) - } - def invalidateTable(tableName: String): Unit = { catalog.invalidateTable(sqlParser.parseTableIdentifier(tableName)) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 00adb9a44b..686c63065d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -622,7 +622,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv .mode(SaveMode.Append) .saveAsTable("arrayInParquet") - sessionState.refreshTable("arrayInParquet") + sparkSession.catalog.refreshTable("arrayInParquet") checkAnswer( sql("SELECT a FROM arrayInParquet"), @@ -681,7 +681,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv .mode(SaveMode.Append) .saveAsTable("mapInParquet") - sessionState.refreshTable("mapInParquet") + sparkSession.catalog.refreshTable("mapInParquet") checkAnswer( sql("SELECT a FROM mapInParquet"), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala index 622b043581..5b706b0432 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala @@ -217,7 +217,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle df.write.parquet(s"$path/p=2") sql("ALTER TABLE t ADD PARTITION (p=2)") - hiveContext.sessionState.refreshTable("t") + spark.catalog.refreshTable("t") checkAnswer( spark.table("t"), df.withColumn("p", lit(1)).union(df.withColumn("p", lit(2)))) @@ -249,7 +249,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle df.write.parquet(s"$path/p=2") sql(s"ALTER TABLE $db.t ADD PARTITION (p=2)") - hiveContext.sessionState.refreshTable(s"$db.t") + spark.catalog.refreshTable(s"$db.t") checkAnswer( spark.table(s"$db.t"), df.withColumn("p", lit(1)).union(df.withColumn("p", lit(2)))) diff --git a/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 75166f6bea..415d4c0049 100644 --- a/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -58,4 +58,16 @@ class HiveContext private[hive]( sparkSession.sharedState.asInstanceOf[HiveSharedState] } + /** + * Invalidate and refresh all the cached the metadata of the given table. For performance reasons, + * Spark SQL or the external data source library it uses might cache certain metadata about a + * table, such as the location of blocks. When those change outside of Spark SQL, users should + * call this function to invalidate the cache. + * + * @since 1.3.0 + */ + def refreshTable(tableName: String): Unit = { + sparkSession.catalog.refreshTable(tableName) + } + } -- GitLab