Skip to content
Snippets Groups Projects
Commit 32fa0b81 authored by Takuya UESHIN's avatar Takuya UESHIN Committed by Wenchen Fan
Browse files

[SPARK-21781][SQL] Modify DataSourceScanExec to use concrete ColumnVector type.

## What changes were proposed in this pull request?

As mentioned at https://github.com/apache/spark/pull/18680#issuecomment-316820409, when we have more `ColumnVector` implementations, it might (or might not) have huge performance implications because it might disable inlining, or force virtual dispatches.

As for read path, one of the major paths is the one generated by `ColumnBatchScan`. Currently it refers `ColumnVector` so the penalty will be bigger as we have more classes, but we can know the concrete type from its usage, e.g. vectorized Parquet reader uses `OnHeapColumnVector`. We can use the concrete type in the generated code directly to avoid the penalty.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #18989 from ueshin/issues/SPARK-21781.
parent c7270a46
No related branches found
No related tags found
No related merge requests found
......@@ -33,6 +33,8 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
val inMemoryTableScan: InMemoryTableScanExec = null
def vectorTypes: Option[Seq[String]] = None
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
......@@ -79,17 +81,19 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
val scanTimeTotalNs = ctx.freshName("scanTime")
ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch"
val columnarBatchClz = classOf[ColumnarBatch].getName
val batch = ctx.freshName("batch")
ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;")
val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector"
val idx = ctx.freshName("batchIdx")
ctx.addMutableState("int", idx, s"$idx = 0;")
val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
val columnAssigns = colVars.zipWithIndex.map { case (name, i) =>
ctx.addMutableState(columnVectorClz, name, s"$name = null;")
s"$name = $batch.column($i);"
val columnVectorClzs = vectorTypes.getOrElse(
Seq.fill(colVars.size)(classOf[ColumnVector].getName))
val columnAssigns = colVars.zip(columnVectorClzs).zipWithIndex.map {
case ((name, columnVectorClz), i) =>
ctx.addMutableState(columnVectorClz, name, s"$name = null;")
s"$name = ($columnVectorClz) $batch.column($i);"
}
val nextBatch = ctx.freshName("nextBatch")
......
......@@ -174,6 +174,11 @@ case class FileSourceScanExec(
false
}
override def vectorTypes: Option[Seq[String]] =
relation.fileFormat.vectorTypes(
requiredSchema = requiredSchema,
partitionSchema = relation.partitionSchema)
@transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
val startTime = System.nanoTime()
......
......@@ -64,6 +64,16 @@ trait FileFormat {
false
}
/**
* Returns concrete column vector class names for each column to be used in a columnar batch
* if this format supports returning columnar batch.
*/
def vectorTypes(
requiredSchema: StructType,
partitionSchema: StructType): Option[Seq[String]] = {
None
}
/**
* Returns whether a file with `path` could be splitted or not.
*/
......
......@@ -47,6 +47,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
......@@ -272,6 +273,13 @@ class ParquetFileFormat
schema.forall(_.dataType.isInstanceOf[AtomicType])
}
override def vectorTypes(
requiredSchema: StructType,
partitionSchema: StructType): Option[Seq[String]] = {
Option(Seq.fill(requiredSchema.fields.length + partitionSchema.fields.length)(
classOf[OnHeapColumnVector].getName))
}
override def isSplitable(
sparkSession: SparkSession,
options: Map[String, String],
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment