Skip to content
Snippets Groups Projects
Commit a4bc442c authored by Cheng Lian's avatar Cheng Lian Committed by Michael Armbrust
Browse files

[SPARK-1669][SQL] Made cacheTable idempotent

JIRA issue: [SPARK-1669](https://issues.apache.org/jira/browse/SPARK-1669)

Caching the same table multiple times should end up with only 1 in-memory columnar representation of this table.

Before:

```
scala> loadTestTable("src")
...
scala> cacheTable("src")
...
scala> cacheTable("src")
...
scala> table("src")
...
== Query Plan ==
InMemoryColumnarTableScan [key#2,value#3], (InMemoryRelation [key#2,value#3], false, (InMemoryColumnarTableScan [key#2,value#3], (InMemoryRelation [key#2,value#3], false, (HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None))))
```

After:

```
scala> loadTestTable("src")
...
scala> cacheTable("src")
...
scala> cacheTable("src")
...
scala> table("src")
...
== Query Plan ==
InMemoryColumnarTableScan [key#2,value#3], (InMemoryRelation [key#2,value#3], false, (HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None))
```

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #1183 from liancheng/spark-1669 and squashes the following commits:

68f8a20 [Cheng Lian] Removed an unused import
51bae90 [Cheng Lian] Made cacheTable idempotent
parent 853a2b95
No related branches found
No related tags found
No related merge requests found
......@@ -187,10 +187,15 @@ class SQLContext(@transient val sparkContext: SparkContext)
/** Caches the specified table in-memory. */
def cacheTable(tableName: String): Unit = {
val currentTable = catalog.lookupRelation(None, tableName)
val useCompression =
sparkContext.conf.getBoolean("spark.sql.inMemoryColumnarStorage.compressed", false)
val asInMemoryRelation =
InMemoryRelation(useCompression, executePlan(currentTable).executedPlan)
val asInMemoryRelation = EliminateAnalysisOperators(currentTable.logicalPlan) match {
case _: InMemoryRelation =>
currentTable.logicalPlan
case _ =>
val useCompression =
sparkContext.conf.getBoolean("spark.sql.inMemoryColumnarStorage.compressed", false)
InMemoryRelation(useCompression, executePlan(currentTable).executedPlan)
}
catalog.registerTable(None, tableName, asInMemoryRelation)
}
......
......@@ -17,7 +17,9 @@
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.test._
/* Implicits */
......@@ -405,4 +407,22 @@ class SQLQuerySuite extends QueryTest {
clear()
}
test("SPARK-1669: cacheTable should be idempotent") {
assume(!table("testData").logicalPlan.isInstanceOf[InMemoryRelation])
cacheTable("testData")
EliminateAnalysisOperators(table("testData").logicalPlan) match {
case _: InMemoryRelation =>
case _ =>
fail("testData should be cached")
}
cacheTable("testData")
EliminateAnalysisOperators(table("testData").logicalPlan) match {
case InMemoryRelation(_, _, _: InMemoryColumnarTableScan) =>
fail("cacheTable is not idempotent")
case _ =>
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment