Skip to content
Snippets Groups Projects
Commit abf29187 authored by Cheng Lian's avatar Cheng Lian Committed by Michael Armbrust
Browse files

[SPARK-3938][SQL] Names in-memory columnar RDD with corresponding table name

This PR enables the Web UI storage tab to show the in-memory table name instead of the mysterious query plan string as the name of the in-memory columnar RDD.

Note that after #2501, a single columnar RDD can be shared by multiple in-memory tables, as long as their query results are the same. In this case, only the first cached table name is shown. For example:

```sql
CACHE TABLE first AS SELECT * FROM src;
CACHE TABLE second AS SELECT * FROM src;
```

The Web UI only shows "In-memory table first".

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3383)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #3383 from liancheng/columnar-rdd-name and squashes the following commits:

071907f [Cheng Lian] Fixes tests
12ddfa6 [Cheng Lian] Names in-memory columnar RDD with corresponding table name
parent 15cacc81
No related branches found
No related tags found
No related merge requests found
...@@ -46,7 +46,7 @@ private[sql] trait CacheManager { ...@@ -46,7 +46,7 @@ private[sql] trait CacheManager {
def isCached(tableName: String): Boolean = lookupCachedData(table(tableName)).nonEmpty def isCached(tableName: String): Boolean = lookupCachedData(table(tableName)).nonEmpty
/** Caches the specified table in-memory. */ /** Caches the specified table in-memory. */
def cacheTable(tableName: String): Unit = cacheQuery(table(tableName)) def cacheTable(tableName: String): Unit = cacheQuery(table(tableName), Some(tableName))
/** Removes the specified table from the in-memory cache. */ /** Removes the specified table from the in-memory cache. */
def uncacheTable(tableName: String): Unit = uncacheQuery(table(tableName)) def uncacheTable(tableName: String): Unit = uncacheQuery(table(tableName))
...@@ -81,6 +81,7 @@ private[sql] trait CacheManager { ...@@ -81,6 +81,7 @@ private[sql] trait CacheManager {
*/ */
private[sql] def cacheQuery( private[sql] def cacheQuery(
query: SchemaRDD, query: SchemaRDD,
tableName: Option[String] = None,
storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock { storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
val planToCache = query.queryExecution.analyzed val planToCache = query.queryExecution.analyzed
if (lookupCachedData(planToCache).nonEmpty) { if (lookupCachedData(planToCache).nonEmpty) {
...@@ -90,7 +91,11 @@ private[sql] trait CacheManager { ...@@ -90,7 +91,11 @@ private[sql] trait CacheManager {
CachedData( CachedData(
planToCache, planToCache,
InMemoryRelation( InMemoryRelation(
useCompression, columnBatchSize, storageLevel, query.queryExecution.executedPlan)) useCompression,
columnBatchSize,
storageLevel,
query.queryExecution.executedPlan,
tableName))
} }
} }
......
...@@ -475,7 +475,7 @@ class SchemaRDD( ...@@ -475,7 +475,7 @@ class SchemaRDD(
} }
override def persist(newLevel: StorageLevel): this.type = { override def persist(newLevel: StorageLevel): this.type = {
sqlContext.cacheQuery(this, newLevel) sqlContext.cacheQuery(this, None, newLevel)
this this
} }
......
...@@ -36,8 +36,9 @@ private[sql] object InMemoryRelation { ...@@ -36,8 +36,9 @@ private[sql] object InMemoryRelation {
useCompression: Boolean, useCompression: Boolean,
batchSize: Int, batchSize: Int,
storageLevel: StorageLevel, storageLevel: StorageLevel,
child: SparkPlan): InMemoryRelation = child: SparkPlan,
new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child)() tableName: Option[String]): InMemoryRelation =
new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)()
} }
private[sql] case class CachedBatch(buffers: Array[Array[Byte]], stats: Row) private[sql] case class CachedBatch(buffers: Array[Array[Byte]], stats: Row)
...@@ -47,7 +48,8 @@ private[sql] case class InMemoryRelation( ...@@ -47,7 +48,8 @@ private[sql] case class InMemoryRelation(
useCompression: Boolean, useCompression: Boolean,
batchSize: Int, batchSize: Int,
storageLevel: StorageLevel, storageLevel: StorageLevel,
child: SparkPlan)( child: SparkPlan,
tableName: Option[String])(
private var _cachedColumnBuffers: RDD[CachedBatch] = null, private var _cachedColumnBuffers: RDD[CachedBatch] = null,
private var _statistics: Statistics = null) private var _statistics: Statistics = null)
extends LogicalPlan with MultiInstanceRelation { extends LogicalPlan with MultiInstanceRelation {
...@@ -137,13 +139,13 @@ private[sql] case class InMemoryRelation( ...@@ -137,13 +139,13 @@ private[sql] case class InMemoryRelation(
} }
}.persist(storageLevel) }.persist(storageLevel)
cached.setName(child.toString) cached.setName(tableName.map(n => s"In-memory table $n").getOrElse(child.toString))
_cachedColumnBuffers = cached _cachedColumnBuffers = cached
} }
def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = { def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
InMemoryRelation( InMemoryRelation(
newOutput, useCompression, batchSize, storageLevel, child)( newOutput, useCompression, batchSize, storageLevel, child, tableName)(
_cachedColumnBuffers, statisticsToBePropagated) _cachedColumnBuffers, statisticsToBePropagated)
} }
...@@ -155,7 +157,8 @@ private[sql] case class InMemoryRelation( ...@@ -155,7 +157,8 @@ private[sql] case class InMemoryRelation(
useCompression, useCompression,
batchSize, batchSize,
storageLevel, storageLevel,
child)( child,
tableName)(
_cachedColumnBuffers, _cachedColumnBuffers,
statisticsToBePropagated).asInstanceOf[this.type] statisticsToBePropagated).asInstanceOf[this.type]
} }
......
...@@ -160,12 +160,11 @@ case class CacheTableCommand( ...@@ -160,12 +160,11 @@ case class CacheTableCommand(
import sqlContext._ import sqlContext._
plan.foreach(_.registerTempTable(tableName)) plan.foreach(_.registerTempTable(tableName))
val schemaRDD = table(tableName) cacheTable(tableName)
schemaRDD.cache()
if (!isLazy) { if (!isLazy) {
// Performs eager caching // Performs eager caching
schemaRDD.count() table(tableName).count()
} }
Seq.empty[Row] Seq.empty[Row]
......
...@@ -123,7 +123,7 @@ class CachedTableSuite extends QueryTest { ...@@ -123,7 +123,7 @@ class CachedTableSuite extends QueryTest {
cacheTable("testData") cacheTable("testData")
assertResult(0, "Double InMemoryRelations found, cacheTable() is not idempotent") { assertResult(0, "Double InMemoryRelations found, cacheTable() is not idempotent") {
table("testData").queryExecution.withCachedData.collect { table("testData").queryExecution.withCachedData.collect {
case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan) => r case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan, _) => r
}.size }.size
} }
......
...@@ -29,7 +29,7 @@ class InMemoryColumnarQuerySuite extends QueryTest { ...@@ -29,7 +29,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
test("simple columnar query") { test("simple columnar query") {
val plan = executePlan(testData.logicalPlan).executedPlan val plan = executePlan(testData.logicalPlan).executedPlan
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan) val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
checkAnswer(scan, testData.collect().toSeq) checkAnswer(scan, testData.collect().toSeq)
} }
...@@ -44,7 +44,7 @@ class InMemoryColumnarQuerySuite extends QueryTest { ...@@ -44,7 +44,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
test("projection") { test("projection") {
val plan = executePlan(testData.select('value, 'key).logicalPlan).executedPlan val plan = executePlan(testData.select('value, 'key).logicalPlan).executedPlan
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan) val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
checkAnswer(scan, testData.collect().map { checkAnswer(scan, testData.collect().map {
case Row(key: Int, value: String) => value -> key case Row(key: Int, value: String) => value -> key
...@@ -53,7 +53,7 @@ class InMemoryColumnarQuerySuite extends QueryTest { ...@@ -53,7 +53,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") { test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
val plan = executePlan(testData.logicalPlan).executedPlan val plan = executePlan(testData.logicalPlan).executedPlan
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan) val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
checkAnswer(scan, testData.collect().toSeq) checkAnswer(scan, testData.collect().toSeq)
checkAnswer(scan, testData.collect().toSeq) checkAnswer(scan, testData.collect().toSeq)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment