Skip to content
Snippets Groups Projects
Commit 5f20ae03 authored by Eric Liang's avatar Eric Liang Committed by Wenchen Fan
Browse files

[SPARK-17980][SQL] Fix refreshByPath for converted Hive tables

## What changes were proposed in this pull request?

There was a bug introduced in https://github.com/apache/spark/pull/14690 which broke refreshByPath with converted hive tables (though, it turns out it was very difficult to refresh converted hive tables anyways, since you had to specify the exact path of one of the partitions).

This changes refreshByPath to invalidate by prefix instead of exact match, and fixes the issue.

cc sameeragarwal for refreshByPath changes
mallman

## How was this patch tested?

Extended unit test.

Author: Eric Liang <ekl@databricks.com>

Closes #15521 from ericl/fix-caching.
parent 941b3f9a
No related branches found
No related tags found
No related merge requests found
......@@ -343,7 +343,8 @@ abstract class Catalog {
/**
* Invalidate and refresh all the cached data (and the associated metadata) for any dataframe that
* contains the given data source path.
* contains the given data source path. Path matching is by prefix, i.e. "/" would invalidate
* everything that is cached.
*
* @since 2.0.0
*/
......
......@@ -185,9 +185,10 @@ class CacheManager extends Logging {
plan match {
case lr: LogicalRelation => lr.relation match {
case hr: HadoopFsRelation =>
val prefixToInvalidate = qualifiedPath.toString
val invalidate = hr.location.rootPaths
.map(_.makeQualified(fs.getUri, fs.getWorkingDirectory))
.contains(qualifiedPath)
.map(_.makeQualified(fs.getUri, fs.getWorkingDirectory).toString)
.exists(_.startsWith(prefixToInvalidate))
if (invalidate) hr.location.refresh()
invalidate
case _ => false
......
......@@ -48,13 +48,18 @@ class TableFileCatalog(
private val baseLocation = catalogTable.storage.locationUri
// Populated on-demand by calls to cachedAllPartitions
private var cachedAllPartitions: ListingFileCatalog = null
override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
filterPartitions(filters).listFiles(Nil)
}
override def refresh(): Unit = {}
override def refresh(): Unit = synchronized {
cachedAllPartitions = null
}
/**
* Returns a [[ListingFileCatalog]] for this table restricted to the subset of partitions
......@@ -64,7 +69,7 @@ class TableFileCatalog(
*/
def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
if (filters.isEmpty) {
cachedAllPartitions
allPartitions
} else {
filterPartitions0(filters)
}
......@@ -89,9 +94,14 @@ class TableFileCatalog(
}
// Not used in the hot path of queries when metastore partition pruning is enabled
lazy val cachedAllPartitions: ListingFileCatalog = filterPartitions0(Nil)
def allPartitions: ListingFileCatalog = synchronized {
if (cachedAllPartitions == null) {
cachedAllPartitions = filterPartitions0(Nil)
}
cachedAllPartitions
}
override def inputFiles: Array[String] = cachedAllPartitions.inputFiles
override def inputFiles: Array[String] = allPartitions.inputFiles
}
/**
......
......@@ -235,7 +235,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
if (lazyPruningEnabled) {
catalog
} else {
catalog.cachedAllPartitions
catalog.allPartitions
}
}
val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet
......
......@@ -80,9 +80,13 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi
val df = spark.sql("select * from test")
assert(sql("select * from test").count() == 5)
def deleteRandomFile(): Unit = {
val p = new Path(spark.table("test").inputFiles.head)
assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, true))
}
// Delete a file, then assert that we tried to read it. This means the table was cached.
val p = new Path(spark.table("test").inputFiles.head)
assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, true))
deleteRandomFile()
val e = intercept[SparkException] {
sql("select * from test").count()
}
......@@ -91,6 +95,19 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi
// Test refreshing the cache.
spark.catalog.refreshTable("test")
assert(sql("select * from test").count() == 4)
assert(spark.table("test").inputFiles.length == 4)
// Test refresh by path separately since it goes through different code paths than
// refreshTable does.
deleteRandomFile()
spark.catalog.cacheTable("test")
spark.catalog.refreshByPath("/some-invalid-path") // no-op
val e2 = intercept[SparkException] {
sql("select * from test").count()
}
assert(e2.getMessage.contains("FileNotFoundException"))
spark.catalog.refreshByPath(dir.getAbsolutePath)
assert(sql("select * from test").count() == 3)
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment