Skip to content
Snippets Groups Projects
Commit a162c9b3 authored by Michael Armbrust's avatar Michael Armbrust Committed by Patrick Wendell
Browse files

[SPARK-2264][SQL] Fix failing CachedTableSuite

Author: Michael Armbrust <michael@databricks.com>

Closes #1201 from marmbrus/fixCacheTests and squashes the following commits:

9d87ed1 [Michael Armbrust] Use analyzer (which runs to fixed point) instead of manually removing analysis operators.
parent 1978a903
No related branches found
No related tags found
No related merge requests found
...@@ -186,8 +186,8 @@ class SQLContext(@transient val sparkContext: SparkContext) ...@@ -186,8 +186,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
/** Caches the specified table in-memory. */ /** Caches the specified table in-memory. */
def cacheTable(tableName: String): Unit = { def cacheTable(tableName: String): Unit = {
val currentTable = catalog.lookupRelation(None, tableName) val currentTable = table(tableName).queryExecution.analyzed
val asInMemoryRelation = EliminateAnalysisOperators(currentTable.logicalPlan) match { val asInMemoryRelation = currentTable match {
case _: InMemoryRelation => case _: InMemoryRelation =>
currentTable.logicalPlan currentTable.logicalPlan
...@@ -202,7 +202,7 @@ class SQLContext(@transient val sparkContext: SparkContext) ...@@ -202,7 +202,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
/** Removes the specified table from the in-memory cache. */ /** Removes the specified table from the in-memory cache. */
def uncacheTable(tableName: String): Unit = { def uncacheTable(tableName: String): Unit = {
EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) match { table(tableName).queryExecution.analyzed match {
// This is kind of a hack to make sure that if this was just an RDD registered as a table, // This is kind of a hack to make sure that if this was just an RDD registered as a table,
// we reregister the RDD as a table. // we reregister the RDD as a table.
case inMem @ InMemoryRelation(_, _, e: ExistingRdd) => case inMem @ InMemoryRelation(_, _, e: ExistingRdd) =>
...@@ -218,8 +218,8 @@ class SQLContext(@transient val sparkContext: SparkContext) ...@@ -218,8 +218,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
/** Returns true if the table is currently cached in-memory. */ /** Returns true if the table is currently cached in-memory. */
def isCached(tableName: String): Boolean = { def isCached(tableName: String): Boolean = {
val relation = catalog.lookupRelation(None, tableName) val relation = table(tableName).queryExecution.analyzed
EliminateAnalysisOperators(relation) match { relation match {
case _: InMemoryRelation => true case _: InMemoryRelation => true
case _ => false case _ => false
} }
......
...@@ -20,13 +20,31 @@ package org.apache.spark.sql ...@@ -20,13 +20,31 @@ package org.apache.spark.sql
import org.apache.spark.sql.TestData._ import org.apache.spark.sql.TestData._
import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan} import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}
import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._
class CachedTableSuite extends QueryTest { class CachedTableSuite extends QueryTest {
TestData // Load test tables. TestData // Load test tables.
// NOTE: ALL TESTS ARE IGNORED PENDING SPARK-2264 test("SPARK-1669: cacheTable should be idempotent") {
assume(!table("testData").logicalPlan.isInstanceOf[InMemoryRelation])
ignore("read from cached table and uncache") { cacheTable("testData")
table("testData").queryExecution.analyzed match {
case _: InMemoryRelation =>
case _ =>
fail("testData should be cached")
}
cacheTable("testData")
table("testData").queryExecution.analyzed match {
case InMemoryRelation(_, _, _: InMemoryColumnarTableScan) =>
fail("cacheTable is not idempotent")
case _ =>
}
}
test("read from cached table and uncache") {
TestSQLContext.cacheTable("testData") TestSQLContext.cacheTable("testData")
checkAnswer( checkAnswer(
...@@ -53,20 +71,20 @@ class CachedTableSuite extends QueryTest { ...@@ -53,20 +71,20 @@ class CachedTableSuite extends QueryTest {
} }
} }
ignore("correct error on uncache of non-cached table") { test("correct error on uncache of non-cached table") {
intercept[IllegalArgumentException] { intercept[IllegalArgumentException] {
TestSQLContext.uncacheTable("testData") TestSQLContext.uncacheTable("testData")
} }
} }
ignore("SELECT Star Cached Table") { test("SELECT Star Cached Table") {
TestSQLContext.sql("SELECT * FROM testData").registerAsTable("selectStar") TestSQLContext.sql("SELECT * FROM testData").registerAsTable("selectStar")
TestSQLContext.cacheTable("selectStar") TestSQLContext.cacheTable("selectStar")
TestSQLContext.sql("SELECT * FROM selectStar WHERE key = 1").collect() TestSQLContext.sql("SELECT * FROM selectStar WHERE key = 1").collect()
TestSQLContext.uncacheTable("selectStar") TestSQLContext.uncacheTable("selectStar")
} }
ignore("Self-join cached") { test("Self-join cached") {
val unCachedAnswer = val unCachedAnswer =
TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key").collect() TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key").collect()
TestSQLContext.cacheTable("testData") TestSQLContext.cacheTable("testData")
...@@ -76,7 +94,7 @@ class CachedTableSuite extends QueryTest { ...@@ -76,7 +94,7 @@ class CachedTableSuite extends QueryTest {
TestSQLContext.uncacheTable("testData") TestSQLContext.uncacheTable("testData")
} }
ignore("'CACHE TABLE' and 'UNCACHE TABLE' SQL statement") { test("'CACHE TABLE' and 'UNCACHE TABLE' SQL statement") {
TestSQLContext.sql("CACHE TABLE testData") TestSQLContext.sql("CACHE TABLE testData")
TestSQLContext.table("testData").queryExecution.executedPlan match { TestSQLContext.table("testData").queryExecution.executedPlan match {
case _: InMemoryColumnarTableScan => // Found evidence of caching case _: InMemoryColumnarTableScan => // Found evidence of caching
......
...@@ -406,23 +406,4 @@ class SQLQuerySuite extends QueryTest { ...@@ -406,23 +406,4 @@ class SQLQuerySuite extends QueryTest {
) )
clear() clear()
} }
test("SPARK-1669: cacheTable should be idempotent") {
assume(!table("testData").logicalPlan.isInstanceOf[InMemoryRelation])
cacheTable("testData")
EliminateAnalysisOperators(table("testData").logicalPlan) match {
case _: InMemoryRelation =>
case _ =>
fail("testData should be cached")
}
cacheTable("testData")
EliminateAnalysisOperators(table("testData").logicalPlan) match {
case InMemoryRelation(_, _, _: InMemoryColumnarTableScan) =>
fail("cacheTable is not idempotent")
case _ =>
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment