Skip to content
Snippets Groups Projects
Commit 20fd35df authored by Yash Datta's avatar Yash Datta Committed by Yin Huai
Browse files

[SPARK-10451] [SQL] Prevent unnecessary serializations in InMemoryColumnarTableScan

Many of the fields in InMemoryColumnar scan and InMemoryRelation can be made transient.

This  reduces my 1000ms job to abt 700 ms . The task size reduces from 2.8 mb to ~1300kb

Author: Yash Datta <Yash.Datta@guavus.com>

Closes #8604 from saucam/serde.
parent e3b5d6cb
No related branches found
No related tags found
No related merge requests found
......@@ -48,10 +48,10 @@ private[sql] case class InMemoryRelation(
useCompression: Boolean,
batchSize: Int,
storageLevel: StorageLevel,
child: SparkPlan,
@transient child: SparkPlan,
tableName: Option[String])(
private var _cachedColumnBuffers: RDD[CachedBatch] = null,
private var _statistics: Statistics = null,
@transient private var _cachedColumnBuffers: RDD[CachedBatch] = null,
@transient private var _statistics: Statistics = null,
private var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null)
extends LogicalPlan with MultiInstanceRelation {
......@@ -62,7 +62,7 @@ private[sql] case class InMemoryRelation(
_batchStats
}
val partitionStatistics = new PartitionStatistics(output)
@transient val partitionStatistics = new PartitionStatistics(output)
private def computeSizeInBytes = {
val sizeOfRow: Expression =
......@@ -196,7 +196,7 @@ private[sql] case class InMemoryRelation(
private[sql] case class InMemoryColumnarTableScan(
attributes: Seq[Attribute],
predicates: Seq[Expression],
relation: InMemoryRelation)
@transient relation: InMemoryRelation)
extends LeafNode {
override def output: Seq[Attribute] = attributes
......@@ -205,7 +205,7 @@ private[sql] case class InMemoryColumnarTableScan(
// Returned filter predicate should return false iff it is impossible for the input expression
// to evaluate to `true' based on statistics collected about this partition batch.
val buildFilter: PartialFunction[Expression, Expression] = {
@transient val buildFilter: PartialFunction[Expression, Expression] = {
case And(lhs: Expression, rhs: Expression)
if buildFilter.isDefinedAt(lhs) || buildFilter.isDefinedAt(rhs) =>
(buildFilter.lift(lhs) ++ buildFilter.lift(rhs)).reduce(_ && _)
......@@ -268,16 +268,23 @@ private[sql] case class InMemoryColumnarTableScan(
readBatches.setValue(0)
}
relation.cachedColumnBuffers.mapPartitions { cachedBatchIterator =>
val partitionFilter = newPredicate(
partitionFilters.reduceOption(And).getOrElse(Literal(true)),
relation.partitionStatistics.schema)
// Using these variables here to avoid serialization of entire objects (if referenced directly)
// within the map Partitions closure.
val schema = relation.partitionStatistics.schema
val schemaIndex = schema.zipWithIndex
val relOutput = relation.output
val buffers = relation.cachedColumnBuffers
buffers.mapPartitions { cachedBatchIterator =>
val partitionFilter = newPredicate(
partitionFilters.reduceOption(And).getOrElse(Literal(true)),
schema)
// Find the ordinals and data types of the requested columns. If none are requested, use the
// narrowest (the field with minimum default element size).
val (requestedColumnIndices, requestedColumnDataTypes) = if (attributes.isEmpty) {
val (narrowestOrdinal, narrowestDataType) =
relation.output.zipWithIndex.map { case (a, ordinal) =>
relOutput.zipWithIndex.map { case (a, ordinal) =>
ordinal -> a.dataType
} minBy { case (_, dataType) =>
ColumnType(dataType).defaultSize
......@@ -285,7 +292,7 @@ private[sql] case class InMemoryColumnarTableScan(
Seq(narrowestOrdinal) -> Seq(narrowestDataType)
} else {
attributes.map { a =>
relation.output.indexWhere(_.exprId == a.exprId) -> a.dataType
relOutput.indexWhere(_.exprId == a.exprId) -> a.dataType
}.unzip
}
......@@ -296,7 +303,7 @@ private[sql] case class InMemoryColumnarTableScan(
// Build column accessors
val columnAccessors = requestedColumnIndices.map { batchColumnIndex =>
ColumnAccessor(
relation.output(batchColumnIndex).dataType,
relOutput(batchColumnIndex).dataType,
ByteBuffer.wrap(cachedBatch.buffers(batchColumnIndex)))
}
......@@ -328,7 +335,7 @@ private[sql] case class InMemoryColumnarTableScan(
if (inMemoryPartitionPruningEnabled) {
cachedBatchIterator.filter { cachedBatch =>
if (!partitionFilter(cachedBatch.stats)) {
def statsString: String = relation.partitionStatistics.schema.zipWithIndex.map {
def statsString: String = schemaIndex.map {
case (a, i) =>
val value = cachedBatch.stats.get(i, a.dataType)
s"${a.name}: $value"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment