Skip to content
Snippets Groups Projects
Commit fb41670c authored by petermaxlee's avatar petermaxlee Committed by Reynold Xin
Browse files

[SPARK-16336][SQL] Suggest doing table refresh upon FileNotFoundException

## What changes were proposed in this pull request?
This patch appends a message to suggest users running refresh table or reloading data frames when Spark sees a FileNotFoundException due to stale, cached metadata.

## How was this patch tested?
Added a unit test for this in MetadataCacheSuite.

Author: petermaxlee <petermaxlee@gmail.com>

Closes #14003 from petermaxlee/SPARK-16336.
parent 5d00a7bc
No related branches found
No related tags found
No related merge requests found
......@@ -117,7 +117,20 @@ class FileScanRDD(
currentFile = files.next()
logInfo(s"Reading File $currentFile")
InputFileNameHolder.setInputFileName(currentFile.filePath)
currentIterator = readFunction(currentFile)
try {
currentIterator = readFunction(currentFile)
} catch {
case e: java.io.FileNotFoundException =>
throw new java.io.FileNotFoundException(
e.getMessage + "\n" +
"It is possible the underlying files have been updated. " +
"You can explicitly invalidate the cache in Spark by " +
"running 'REFRESH TABLE tableName' command in SQL or " +
"by recreating the Dataset/DataFrame involved."
)
}
hasNext
} else {
currentFile = null
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import java.io.File
import org.apache.spark.SparkException
import org.apache.spark.sql.test.SharedSQLContext
/**
* Test suite to handle metadata cache related.
*/
class MetadataCacheSuite extends QueryTest with SharedSQLContext {
/** Removes one data file in the given directory. */
private def deleteOneFileInDirectory(dir: File): Unit = {
assert(dir.isDirectory)
val oneFile = dir.listFiles().find { file =>
!file.getName.startsWith("_") && !file.getName.startsWith(".")
}
assert(oneFile.isDefined)
oneFile.foreach(_.delete())
}
test("SPARK-16336 Suggest doing table refresh when encountering FileNotFoundException") {
withTempPath { (location: File) =>
// Create a Parquet directory
spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
.write.parquet(location.getAbsolutePath)
// Read the directory in
val df = spark.read.parquet(location.getAbsolutePath)
assert(df.count() == 100)
// Delete a file
deleteOneFileInDirectory(location)
// Read it again and now we should see a FileNotFoundException
val e = intercept[SparkException] {
df.count()
}
assert(e.getMessage.contains("FileNotFoundException"))
assert(e.getMessage.contains("REFRESH"))
}
}
ignore("SPARK-16337 temporary view refresh") {
withTempPath { (location: File) =>
// Create a Parquet directory
spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
.write.parquet(location.getAbsolutePath)
// Read the directory in
spark.read.parquet(location.getAbsolutePath).createOrReplaceTempView("view_refresh")
assert(sql("select count(*) from view_refresh").first().getLong(0) == 100)
// Delete a file
deleteOneFileInDirectory(location)
// Read it again and now we should see a FileNotFoundException
val e = intercept[SparkException] {
sql("select count(*) from view_refresh").first()
}
assert(e.getMessage.contains("FileNotFoundException"))
assert(e.getMessage.contains("refresh()"))
// Refresh and we should be able to read it again.
spark.catalog.refreshTable("view_refresh")
val newCount = sql("select count(*) from view_refresh").first().getLong(0)
assert(newCount > 0 && newCount < 100)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment