From fb41670c9263a89ec233861cc91a19cf1bb19073 Mon Sep 17 00:00:00 2001
From: petermaxlee <petermaxlee@gmail.com>
Date: Thu, 30 Jun 2016 16:49:59 -0700
Subject: [PATCH] [SPARK-16336][SQL] Suggest doing table refresh upon
 FileNotFoundException

## What changes were proposed in this pull request?
This patch appends a message to suggest users running refresh table or reloading data frames when Spark sees a FileNotFoundException due to stale, cached metadata.

## How was this patch tested?
Added a unit test for this in MetadataCacheSuite.

Author: petermaxlee <petermaxlee@gmail.com>

Closes #14003 from petermaxlee/SPARK-16336.
---
 .../execution/datasources/FileScanRDD.scala   | 15 +++-
 .../apache/spark/sql/MetadataCacheSuite.scala | 88 +++++++++++++++++++
 2 files changed, 102 insertions(+), 1 deletion(-)
 create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 1443057d5c..c66da3a831 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -117,7 +117,20 @@ class FileScanRDD(
           currentFile = files.next()
           logInfo(s"Reading File $currentFile")
           InputFileNameHolder.setInputFileName(currentFile.filePath)
-          currentIterator = readFunction(currentFile)
+
+          try {
+            currentIterator = readFunction(currentFile)
+          } catch {
+            case e: java.io.FileNotFoundException =>
+              throw new java.io.FileNotFoundException(
+                e.getMessage + "\n" +
+                  "It is possible the underlying files have been updated. " +
+                  "You can explicitly invalidate the cache in Spark by " +
+                  "running 'REFRESH TABLE tableName' command in SQL or " +
+                  "by recreating the Dataset/DataFrame involved."
+              )
+          }
+
           hasNext
         } else {
           currentFile = null
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
new file mode 100644
index 0000000000..d872f4baa6
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.test.SharedSQLContext
+
+/**
+ * Test suite to handle metadata cache related.
+ */
+class MetadataCacheSuite extends QueryTest with SharedSQLContext {
+
+  /** Removes one data file in the given directory. */
+  private def deleteOneFileInDirectory(dir: File): Unit = {
+    assert(dir.isDirectory)
+    val oneFile = dir.listFiles().find { file =>
+      !file.getName.startsWith("_") && !file.getName.startsWith(".")
+    }
+    assert(oneFile.isDefined)
+    oneFile.foreach(_.delete())
+  }
+
+  test("SPARK-16336 Suggest doing table refresh when encountering FileNotFoundException") {
+    withTempPath { (location: File) =>
+      // Create a Parquet directory
+      spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
+        .write.parquet(location.getAbsolutePath)
+
+      // Read the directory in
+      val df = spark.read.parquet(location.getAbsolutePath)
+      assert(df.count() == 100)
+
+      // Delete a file
+      deleteOneFileInDirectory(location)
+
+      // Read it again and now we should see a FileNotFoundException
+      val e = intercept[SparkException] {
+        df.count()
+      }
+      assert(e.getMessage.contains("FileNotFoundException"))
+      assert(e.getMessage.contains("REFRESH"))
+    }
+  }
+
+  ignore("SPARK-16337 temporary view refresh") {
+    withTempPath { (location: File) =>
+      // Create a Parquet directory
+      spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
+        .write.parquet(location.getAbsolutePath)
+
+      // Read the directory in
+      spark.read.parquet(location.getAbsolutePath).createOrReplaceTempView("view_refresh")
+      assert(sql("select count(*) from view_refresh").first().getLong(0) == 100)
+
+      // Delete a file
+      deleteOneFileInDirectory(location)
+
+      // Read it again and now we should see a FileNotFoundException
+      val e = intercept[SparkException] {
+        sql("select count(*) from view_refresh").first()
+      }
+      assert(e.getMessage.contains("FileNotFoundException"))
+      assert(e.getMessage.contains("refresh()"))
+
+      // Refresh and we should be able to read it again.
+      spark.catalog.refreshTable("view_refresh")
+      val newCount = sql("select count(*) from view_refresh").first().getLong(0)
+      assert(newCount > 0 && newCount < 100)
+    }
+  }
+}
-- 
GitLab