From 38e7835347a2e1803b1df5e73cf8b749951b11b2 Mon Sep 17 00:00:00 2001
From: Liang-Chi Hsieh <viirya@gmail.com>
Date: Wed, 1 Mar 2017 00:19:57 -0800
Subject: [PATCH] [SPARK-19736][SQL] refreshByPath should clear all cached
 plans with the specified path

## What changes were proposed in this pull request?

`Catalog.refreshByPath` can refresh the cache entry and the associated metadata for all dataframes (if any), that contain the given data source path.

However, `CacheManager.invalidateCachedPath` doesn't clear all cached plans with the specified path. It causes some strange behaviors reported in SPARK-15678.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #17064 from viirya/fix-refreshByPath.
---
 .../spark/sql/execution/CacheManager.scala    | 19 ++++++++++---------
 .../apache/spark/sql/CachedTableSuite.scala   | 16 ++++++++++++++++
 2 files changed, 26 insertions(+), 9 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 4ca1347008..80138510dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -168,15 +168,16 @@ class CacheManager extends Logging {
       (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory))
     }
 
-    cachedData.foreach {
-      case data if data.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined =>
-        val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan))
-        if (dataIndex >= 0) {
-          data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = true)
-          cachedData.remove(dataIndex)
-        }
-        sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, data.plan))
-      case _ => // Do Nothing
+    cachedData.filter {
+      case data if data.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined => true
+      case _ => false
+    }.foreach { data =>
+      val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan))
+      if (dataIndex >= 0) {
+        data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = true)
+        cachedData.remove(dataIndex)
+      }
+      sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, data.plan))
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 1af1a36529..2a0e088437 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -634,4 +634,20 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
       assert(getNumInMemoryRelations(cachedPlan2) == 4)
     }
   }
+
+  test("refreshByPath should refresh all cached plans with the specified path") {
+    withTempDir { dir =>
+      val path = dir.getCanonicalPath()
+
+      spark.range(10).write.mode("overwrite").parquet(path)
+      spark.read.parquet(path).cache()
+      spark.read.parquet(path).filter($"id" > 4).cache()
+      assert(spark.read.parquet(path).filter($"id" > 4).count() == 5)
+
+      spark.range(20).write.mode("overwrite").parquet(path)
+      spark.catalog.refreshByPath(path)
+      assert(spark.read.parquet(path).count() == 20)
+      assert(spark.read.parquet(path).filter($"id" > 4).count() == 15)
+    }
+  }
 }
-- 
GitLab