From 421382d0e728940caa3e61bc11237c61f256378a Mon Sep 17 00:00:00 2001
From: Cheng Lian <lian.cs.zju@gmail.com>
Date: Thu, 9 Oct 2014 18:26:43 -0700
Subject: [PATCH] [SPARK-3824][SQL] Sets in-memory table default storage level
 to MEMORY_AND_DISK

Using `MEMORY_AND_DISK` as default storage level for in-memory table caching. Due to the in-memory columnar representation, recomputing an in-memory cached table partitions can be very expensive.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #2686 from liancheng/spark-3824 and squashes the following commits:

35d2ed0 [Cheng Lian] Removes extra space
1ab7967 [Cheng Lian] Reduces test data size to fit DiskStore.getBytes()
ba565f0 [Cheng Lian] Maks CachedBatch serializable
07f0204 [Cheng Lian] Sets in-memory table default storage level to MEMORY_AND_DISK
---
 .../main/scala/org/apache/spark/sql/CacheManager.scala | 10 +++++++---
 .../spark/sql/columnar/InMemoryColumnarTableScan.scala |  9 +++++----
 .../scala/org/apache/spark/sql/CachedTableSuite.scala  | 10 +++++-----
 3 files changed, 17 insertions(+), 12 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
index 3bf7382ac6..5ab2b5316a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.columnar.InMemoryRelation
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.storage.StorageLevel.MEMORY_ONLY
+import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
 
 /** Holds a cached logical plan and its data */
 private case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
@@ -74,10 +74,14 @@ private[sql] trait CacheManager {
     cachedData.clear()
   }
 
-  /** Caches the data produced by the logical representation of the given schema rdd. */
+  /**
+   * Caches the data produced by the logical representation of the given schema rdd.  Unlike
+   * `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because recomputing
+   * the in-memory columnar representation of the underlying table is expensive.
+   */
   private[sql] def cacheQuery(
       query: SchemaRDD,
-      storageLevel: StorageLevel = MEMORY_ONLY): Unit = writeLock {
+      storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
     val planToCache = query.queryExecution.optimizedPlan
     if (lookupCachedData(planToCache).nonEmpty) {
       logWarning("Asked to cache already cached data.")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 4f79173a26..22ab0e2613 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -38,7 +38,7 @@ private[sql] object InMemoryRelation {
     new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child)()
 }
 
-private[sql] case class CachedBatch(buffers: Array[ByteBuffer], stats: Row)
+private[sql] case class CachedBatch(buffers: Array[Array[Byte]], stats: Row)
 
 private[sql] case class InMemoryRelation(
     output: Seq[Attribute],
@@ -91,7 +91,7 @@ private[sql] case class InMemoryRelation(
           val stats = Row.fromSeq(
             columnBuilders.map(_.columnStats.collectedStatistics).foldLeft(Seq.empty[Any])(_ ++ _))
 
-          CachedBatch(columnBuilders.map(_.build()), stats)
+          CachedBatch(columnBuilders.map(_.build().array()), stats)
         }
 
         def hasNext = rowIterator.hasNext
@@ -238,8 +238,9 @@ private[sql] case class InMemoryColumnarTableScan(
       def cachedBatchesToRows(cacheBatches: Iterator[CachedBatch]) = {
         val rows = cacheBatches.flatMap { cachedBatch =>
           // Build column accessors
-          val columnAccessors =
-            requestedColumnIndices.map(cachedBatch.buffers(_)).map(ColumnAccessor(_))
+          val columnAccessors = requestedColumnIndices.map { batch =>
+            ColumnAccessor(ByteBuffer.wrap(cachedBatch.buffers(batch)))
+          }
 
           // Extract rows via column accessors
           new Iterator[Row] {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index c87ded81fd..444bc95009 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
 import org.apache.spark.sql.TestData._
 import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
 import org.apache.spark.sql.test.TestSQLContext._
-import org.apache.spark.storage.RDDBlockId
+import org.apache.spark.storage.{StorageLevel, RDDBlockId}
 
 case class BigData(s: String)
 
@@ -55,10 +55,10 @@ class CachedTableSuite extends QueryTest {
 
   test("too big for memory") {
     val data = "*" * 10000
-    sparkContext.parallelize(1 to 1000000, 1).map(_ => BigData(data)).registerTempTable("bigData")
-    cacheTable("bigData")
-    assert(table("bigData").count() === 1000000L)
-    uncacheTable("bigData")
+    sparkContext.parallelize(1 to 200000, 1).map(_ => BigData(data)).registerTempTable("bigData")
+    table("bigData").persist(StorageLevel.MEMORY_AND_DISK)
+    assert(table("bigData").count() === 200000L)
+    table("bigData").unpersist()
   }
 
   test("calling .cache() should use in-memory columnar caching") {
-- 
GitLab