From 5959df217df53196607b7fa744cdc2b36311360e Mon Sep 17 00:00:00 2001 From: gatorsmile <gatorsmile@gmail.com> Date: Mon, 8 Aug 2016 22:34:28 +0800 Subject: [PATCH] [SPARK-16936][SQL] Case Sensitivity Support for Refresh Temp Table ### What changes were proposed in this pull request? Currently, the `refreshTable` API is always case sensitive. When users use the view name without the exact case match, the API silently ignores the call. Users might expect the command has been successfully completed. However, when users run the subsequent SQL commands, they might still get the exception, like ``` Job aborted due to stage failure: Task 1 in stage 4.0 failed 1 times, most recent failure: Lost task 1.0 in stage 4.0 (TID 7, localhost): java.io.FileNotFoundException: File file:/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-bd4b9ea6-9aec-49c5-8f05-01cff426211e/part-r-00000-0c84b915-c032-4f2e-abf5-1d48fdbddf38.snappy.parquet does not exist ``` This PR is to fix the issue. ### How was this patch tested? Added a test case. Author: gatorsmile <gatorsmile@gmail.com> Closes #14523 from gatorsmile/refreshTempTable. --- .../sql/catalyst/catalog/SessionCatalog.scala | 4 +-- .../spark/sql/execution/SparkSqlParser.scala | 2 +- .../apache/spark/sql/MetadataCacheSuite.scala | 25 +++++++++++++++++++ 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index fabab32592..00c3db0aac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -491,7 +491,7 @@ class SessionCatalog( // If the database is defined, this is definitely not a temp table. // If the database is not defined, there is a good chance this is a temp table. if (name.database.isEmpty) { - tempTables.get(name.table).foreach(_.refresh()) + tempTables.get(formatTableName(name.table)).foreach(_.refresh()) } } @@ -508,7 +508,7 @@ class SessionCatalog( * For testing only. */ private[catalog] def getTempTable(name: String): Option[LogicalPlan] = synchronized { - tempTables.get(name) + tempTables.get(formatTableName(name)) } // ---------------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index c3e3b215bb..2a452f4379 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1212,7 +1212,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { * * For example: * {{{ - * CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [db_name.]view_name + * CREATE [OR REPLACE] [TEMPORARY] VIEW [IF NOT EXISTS] [db_name.]view_name * [(column_name [COMMENT column_comment], ...) ] * [COMMENT view_comment] * [TBLPROPERTIES (property_name = property_value, ...)] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala index eacf254cd1..98aa447fc0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import java.io.File import org.apache.spark.SparkException +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext /** @@ -85,4 +86,28 @@ class MetadataCacheSuite extends QueryTest with SharedSQLContext { assert(newCount > 0 && newCount < 100) }} } + + test("case sensitivity support in temporary view refresh") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + withTempView("view_refresh") { + withTempPath { (location: File) => + // Create a Parquet directory + spark.range(start = 0, end = 100, step = 1, numPartitions = 3) + .write.parquet(location.getAbsolutePath) + + // Read the directory in + spark.read.parquet(location.getAbsolutePath).createOrReplaceTempView("view_refresh") + + // Delete a file + deleteOneFileInDirectory(location) + intercept[SparkException](sql("select count(*) from view_refresh").first()) + + // Refresh and we should be able to read it again. + spark.catalog.refreshTable("vIeW_reFrEsH") + val newCount = sql("select count(*) from view_refresh").first().getLong(0) + assert(newCount > 0 && newCount < 100) + } + } + } + } } -- GitLab