Skip to content
Snippets Groups Projects
Commit ce233f18 authored by windpiger's avatar windpiger Committed by Wenchen Fan
Browse files

[SPARK-19463][SQL] refresh cache after the InsertIntoHadoopFsRelationCommand

## What changes were proposed in this pull request?

If we first cache a DataSource table, then we insert some data into the table, we should refresh the data in the cache after the insert command.

## How was this patch tested?
unit test added

Author: windpiger <songjun@outlook.com>

Closes #16809 from windpiger/refreshCacheAfterInsert.
parent 9734a928
No related branches found
No related tags found
No related merge requests found
...@@ -147,7 +147,10 @@ case class InsertIntoHadoopFsRelationCommand( ...@@ -147,7 +147,10 @@ case class InsertIntoHadoopFsRelationCommand(
refreshFunction = refreshPartitionsCallback, refreshFunction = refreshPartitionsCallback,
options = options) options = options)
// refresh cached files in FileIndex
fileIndex.foreach(_.refresh()) fileIndex.foreach(_.refresh())
// refresh data cache if table is cached
sparkSession.catalog.refreshByPath(outputPath.toString)
} else { } else {
logInfo("Skipping insertion into a relation that already exists.") logInfo("Skipping insertion into a relation that already exists.")
} }
......
...@@ -77,8 +77,6 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext ...@@ -77,8 +77,6 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
val df = spark.read.parquet(path).cache() val df = spark.read.parquet(path).cache()
assert(df.count() == 1000) assert(df.count() == 1000)
spark.range(10).write.mode("overwrite").parquet(path) spark.range(10).write.mode("overwrite").parquet(path)
assert(df.count() == 1000)
spark.catalog.refreshByPath(path)
assert(df.count() == 10) assert(df.count() == 10)
assert(spark.read.parquet(path).count() == 10) assert(spark.read.parquet(path).count() == 10)
} }
...@@ -91,8 +89,6 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext ...@@ -91,8 +89,6 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
val df = spark.read.parquet(path).cache() val df = spark.read.parquet(path).cache()
assert(df.count() == 1000) assert(df.count() == 1000)
spark.range(10).write.mode("append").parquet(path) spark.range(10).write.mode("append").parquet(path)
assert(df.count() == 1000)
spark.catalog.refreshByPath(path)
assert(df.count() == 1010) assert(df.count() == 1010)
assert(spark.read.parquet(path).count() == 1010) assert(spark.read.parquet(path).count() == 1010)
} }
......
...@@ -281,15 +281,15 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { ...@@ -281,15 +281,15 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
""".stripMargin) """.stripMargin)
// jsonTable should be recached. // jsonTable should be recached.
assertCached(sql("SELECT * FROM jsonTable")) assertCached(sql("SELECT * FROM jsonTable"))
// TODO we need to invalidate the cached data in InsertIntoHadoopFsRelation
// // The cached data is the new data. // The cached data is the new data.
// checkAnswer( checkAnswer(
// sql("SELECT a, b FROM jsonTable"), sql("SELECT a, b FROM jsonTable"),
// sql("SELECT a * 2, b FROM jt").collect()) sql("SELECT a * 2, b FROM jt").collect())
//
// // Verify uncaching // Verify uncaching
// spark.catalog.uncacheTable("jsonTable") spark.catalog.uncacheTable("jsonTable")
// assertCached(sql("SELECT * FROM jsonTable"), 0) assertCached(sql("SELECT * FROM jsonTable"), 0)
} }
test("it's not allowed to insert into a relation that is not an InsertableRelation") { test("it's not allowed to insert into a relation that is not an InsertableRelation") {
......
...@@ -204,13 +204,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto ...@@ -204,13 +204,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
assertCached(table("refreshTable")) assertCached(table("refreshTable"))
// Append new data. // Append new data.
table("src").write.mode(SaveMode.Append).parquet(tempPath.toString) table("src").write.mode(SaveMode.Append).parquet(tempPath.toString)
// We are still using the old data.
assertCached(table("refreshTable")) assertCached(table("refreshTable"))
checkAnswer(
table("refreshTable"),
table("src").collect())
// Refresh the table.
sql("REFRESH TABLE refreshTable")
// We are using the new data. // We are using the new data.
assertCached(table("refreshTable")) assertCached(table("refreshTable"))
checkAnswer( checkAnswer(
...@@ -249,13 +244,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto ...@@ -249,13 +244,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
assertCached(table("refreshTable")) assertCached(table("refreshTable"))
// Append new data. // Append new data.
table("src").write.mode(SaveMode.Append).parquet(tempPath.toString) table("src").write.mode(SaveMode.Append).parquet(tempPath.toString)
// We are still using the old data.
assertCached(table("refreshTable")) assertCached(table("refreshTable"))
checkAnswer(
table("refreshTable"),
table("src").collect())
// Refresh the table.
sql(s"REFRESH ${tempPath.toString}")
// We are using the new data. // We are using the new data.
assertCached(table("refreshTable")) assertCached(table("refreshTable"))
checkAnswer( checkAnswer(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment