From abf29187f0342b607fcefe269391d4db58d2a957 Mon Sep 17 00:00:00 2001 From: Cheng Lian <lian@databricks.com> Date: Thu, 20 Nov 2014 13:12:24 -0800 Subject: [PATCH] [SPARK-3938][SQL] Names in-memory columnar RDD with corresponding table name This PR enables the Web UI storage tab to show the in-memory table name instead of the mysterious query plan string as the name of the in-memory columnar RDD. Note that after #2501, a single columnar RDD can be shared by multiple in-memory tables, as long as their query results are the same. In this case, only the first cached table name is shown. For example: ```sql CACHE TABLE first AS SELECT * FROM src; CACHE TABLE second AS SELECT * FROM src; ``` The Web UI only shows "In-memory table first". <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3383) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #3383 from liancheng/columnar-rdd-name and squashes the following commits: 071907f [Cheng Lian] Fixes tests 12ddfa6 [Cheng Lian] Names in-memory columnar RDD with corresponding table name --- .../scala/org/apache/spark/sql/CacheManager.scala | 9 +++++++-- .../scala/org/apache/spark/sql/SchemaRDD.scala | 2 +- .../sql/columnar/InMemoryColumnarTableScan.scala | 15 +++++++++------ .../org/apache/spark/sql/execution/commands.scala | 5 ++--- .../org/apache/spark/sql/CachedTableSuite.scala | 2 +- .../sql/columnar/InMemoryColumnarQuerySuite.scala | 6 +++--- 6 files changed, 23 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala index 2e7abac1f1..3c9439b2e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala @@ -46,7 +46,7 @@ private[sql] trait CacheManager { def isCached(tableName: String): Boolean = lookupCachedData(table(tableName)).nonEmpty /** Caches the specified table in-memory. */ - def cacheTable(tableName: String): Unit = cacheQuery(table(tableName)) + def cacheTable(tableName: String): Unit = cacheQuery(table(tableName), Some(tableName)) /** Removes the specified table from the in-memory cache. */ def uncacheTable(tableName: String): Unit = uncacheQuery(table(tableName)) @@ -81,6 +81,7 @@ private[sql] trait CacheManager { */ private[sql] def cacheQuery( query: SchemaRDD, + tableName: Option[String] = None, storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock { val planToCache = query.queryExecution.analyzed if (lookupCachedData(planToCache).nonEmpty) { @@ -90,7 +91,11 @@ private[sql] trait CacheManager { CachedData( planToCache, InMemoryRelation( - useCompression, columnBatchSize, storageLevel, query.queryExecution.executedPlan)) + useCompression, + columnBatchSize, + storageLevel, + query.queryExecution.executedPlan, + tableName)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index fbec2f9f4b..904a276ef3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -475,7 +475,7 @@ class SchemaRDD( } override def persist(newLevel: StorageLevel): this.type = { - sqlContext.cacheQuery(this, newLevel) + sqlContext.cacheQuery(this, None, newLevel) this } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index 881d32b105..0cebe823b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -36,8 +36,9 @@ private[sql] object InMemoryRelation { useCompression: Boolean, batchSize: Int, storageLevel: StorageLevel, - child: SparkPlan): InMemoryRelation = - new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child)() + child: SparkPlan, + tableName: Option[String]): InMemoryRelation = + new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)() } private[sql] case class CachedBatch(buffers: Array[Array[Byte]], stats: Row) @@ -47,7 +48,8 @@ private[sql] case class InMemoryRelation( useCompression: Boolean, batchSize: Int, storageLevel: StorageLevel, - child: SparkPlan)( + child: SparkPlan, + tableName: Option[String])( private var _cachedColumnBuffers: RDD[CachedBatch] = null, private var _statistics: Statistics = null) extends LogicalPlan with MultiInstanceRelation { @@ -137,13 +139,13 @@ private[sql] case class InMemoryRelation( } }.persist(storageLevel) - cached.setName(child.toString) + cached.setName(tableName.map(n => s"In-memory table $n").getOrElse(child.toString)) _cachedColumnBuffers = cached } def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = { InMemoryRelation( - newOutput, useCompression, batchSize, storageLevel, child)( + newOutput, useCompression, batchSize, storageLevel, child, tableName)( _cachedColumnBuffers, statisticsToBePropagated) } @@ -155,7 +157,8 @@ private[sql] case class InMemoryRelation( useCompression, batchSize, storageLevel, - child)( + child, + tableName)( _cachedColumnBuffers, statisticsToBePropagated).asInstanceOf[this.type] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index f23b9c48cf..afe3f3f074 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -160,12 +160,11 @@ case class CacheTableCommand( import sqlContext._ plan.foreach(_.registerTempTable(tableName)) - val schemaRDD = table(tableName) - schemaRDD.cache() + cacheTable(tableName) if (!isLazy) { // Performs eager caching - schemaRDD.count() + table(tableName).count() } Seq.empty[Row] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 765fa82776..042210176a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -123,7 +123,7 @@ class CachedTableSuite extends QueryTest { cacheTable("testData") assertResult(0, "Double InMemoryRelations found, cacheTable() is not idempotent") { table("testData").queryExecution.withCachedData.collect { - case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan) => r + case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan, _) => r }.size } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala index 15903d07df..fc95dccc74 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala @@ -29,7 +29,7 @@ class InMemoryColumnarQuerySuite extends QueryTest { test("simple columnar query") { val plan = executePlan(testData.logicalPlan).executedPlan - val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan) + val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) checkAnswer(scan, testData.collect().toSeq) } @@ -44,7 +44,7 @@ class InMemoryColumnarQuerySuite extends QueryTest { test("projection") { val plan = executePlan(testData.select('value, 'key).logicalPlan).executedPlan - val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan) + val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) checkAnswer(scan, testData.collect().map { case Row(key: Int, value: String) => value -> key @@ -53,7 +53,7 @@ class InMemoryColumnarQuerySuite extends QueryTest { test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") { val plan = executePlan(testData.logicalPlan).executedPlan - val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan) + val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) checkAnswer(scan, testData.collect().toSeq) checkAnswer(scan, testData.collect().toSeq) -- GitLab