Skip to content
Snippets Groups Projects
Commit 9ac68dbc authored by Yin Huai's avatar Yin Huai
Browse files

[SPARK-17549][SQL] Revert "[] Only collect table size stat in driver for cached relation."

This reverts commit 39e2bad6 because of the problem mentioned at https://issues.apache.org/jira/browse/SPARK-17549?focusedCommentId=15505060&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15505060

Author: Yin Huai <yhuai@databricks.com>

Closes #15157 from yhuai/revert-SPARK-17549.
parent a6aade00
No related branches found
No related tags found
No related merge requests found
......@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.columnar
import scala.collection.JavaConverters._
import org.apache.commons.lang3.StringUtils
import org.apache.spark.network.util.JavaUtils
......@@ -29,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.LongAccumulator
import org.apache.spark.util.CollectionAccumulator
object InMemoryRelation {
......@@ -61,7 +63,8 @@ case class InMemoryRelation(
@transient child: SparkPlan,
tableName: Option[String])(
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator)
val batchStats: CollectionAccumulator[InternalRow] =
child.sqlContext.sparkContext.collectionAccumulator[InternalRow])
extends logical.LeafNode with MultiInstanceRelation {
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
......@@ -71,12 +74,21 @@ case class InMemoryRelation(
@transient val partitionStatistics = new PartitionStatistics(output)
override lazy val statistics: Statistics = {
if (batchStats.value == 0L) {
if (batchStats.value.isEmpty) {
// Underlying columnar RDD hasn't been materialized, no useful statistics information
// available, return the default statistics.
Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
} else {
Statistics(sizeInBytes = batchStats.value.longValue)
// Underlying columnar RDD has been materialized, required information has also been
// collected via the `batchStats` accumulator.
val sizeOfRow: Expression =
BindReferences.bindReference(
output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
partitionStatistics.schema)
val sizeInBytes =
batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
Statistics(sizeInBytes = sizeInBytes)
}
}
......@@ -127,10 +139,10 @@ case class InMemoryRelation(
rowCount += 1
}
batchStats.add(totalSize)
val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics)
.flatMap(_.values))
batchStats.add(stats)
CachedBatch(rowCount, columnBuilders.map { builder =>
JavaUtils.bufferToArray(builder.build())
}, stats)
......
......@@ -232,18 +232,4 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
val columnTypes2 = List.fill(length2)(IntegerType)
val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
}
test("SPARK-17549: cached table size should be correctly calculated") {
val data = spark.sparkContext.parallelize(1 to 10, 5).toDF()
val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None)
// Materialize the data.
val expectedAnswer = data.collect()
checkAnswer(cached, expectedAnswer)
// Check that the right size was calculated.
assert(cached.batchStats.value === expectedAnswer.size * INT.defaultSize)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment