Skip to content
Snippets Groups Projects
Commit 4b4c3bf3 authored by windpiger's avatar windpiger Committed by Wenchen Fan
Browse files

[SPARK-19748][SQL] refresh function has a wrong order to do cache invalidate...

[SPARK-19748][SQL] refresh function has a wrong order to do cache invalidate and regenerate the inmemory var for InMemoryFileIndex with FileStatusCache

## What changes were proposed in this pull request?

If we refresh a InMemoryFileIndex with a FileStatusCache, it will first use the FileStatusCache to re-generate the cachedLeafFiles etc, then call FileStatusCache.invalidateAll.

While the order to do these two actions is wrong, this lead to the refresh action does not take effect.

```
  override def refresh(): Unit = {
    refresh0()
    fileStatusCache.invalidateAll()
  }

  private def refresh0(): Unit = {
    val files = listLeafFiles(rootPaths)
    cachedLeafFiles =
      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
    cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
    cachedPartitionSpec = null
  }
```
## How was this patch tested?
unit test added

Author: windpiger <songjun@outlook.com>

Closes #17079 from windpiger/fixInMemoryFileIndexRefresh.

(cherry picked from commit a350bc16)
Signed-off-by: default avatarWenchen Fan <wenchen@databricks.com>
parent 04fbb9e0
No related branches found
No related tags found
No related merge requests found
......@@ -66,8 +66,8 @@ class InMemoryFileIndex(
}
override def refresh(): Unit = {
refresh0()
fileStatusCache.invalidateAll()
refresh0()
}
private def refresh0(): Unit = {
......
......@@ -177,6 +177,32 @@ class FileIndexSuite extends SharedSQLContext {
assert(catalog2.allFiles().nonEmpty)
}
}
test("refresh for InMemoryFileIndex with FileStatusCache") {
withTempDir { dir =>
val fileStatusCache = FileStatusCache.getOrCreate(spark)
val dirPath = new Path(dir.getAbsolutePath)
val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf())
val catalog =
new InMemoryFileIndex(spark, Seq(dirPath), Map.empty, None, fileStatusCache) {
def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq
def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq
}
val file = new File(dir, "text.txt")
stringToFile(file, "text")
assert(catalog.leafDirPaths.isEmpty)
assert(catalog.leafFilePaths.isEmpty)
catalog.refresh()
assert(catalog.leafFilePaths.size == 1)
assert(catalog.leafFilePaths.head == fs.makeQualified(new Path(file.getAbsolutePath)))
assert(catalog.leafDirPaths.size == 1)
assert(catalog.leafDirPaths.head == fs.makeQualified(dirPath))
}
}
}
class FakeParentPathFileSystem extends RawLocalFileSystem {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment