Skip to content
Snippets Groups Projects
Commit f328feda authored by Herman van Hovell's avatar Herman van Hovell Committed by Michael Armbrust
Browse files

[SPARK-11450] [SQL] Add Unsafe Row processing to Expand

This PR enables the Expand operator to process and produce Unsafe Rows.

Author: Herman van Hovell <hvanhovell@questtec.nl>

Closes #9414 from hvanhovell/SPARK-11450.
parent 49f1a820
No related branches found
No related tags found
No related merge requests found
......@@ -128,7 +128,11 @@ object UnsafeProjection {
* Returns an UnsafeProjection for given sequence of Expressions (bounded).
*/
def create(exprs: Seq[Expression]): UnsafeProjection = {
GenerateUnsafeProjection.generate(exprs)
val unsafeExprs = exprs.map(_ transform {
case CreateStruct(children) => CreateStructUnsafe(children)
case CreateNamedStruct(children) => CreateNamedStructUnsafe(children)
})
GenerateUnsafeProjection.generate(unsafeExprs)
}
def create(expr: Expression): UnsafeProjection = create(Seq(expr))
......
......@@ -41,14 +41,21 @@ case class Expand(
// as UNKNOWN partitioning
override def outputPartitioning: Partitioning = UnknownPartitioning(0)
override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = true
private[this] val projection = {
if (outputsUnsafeRows) {
(exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)
} else {
(exprs: Seq[Expression]) => newMutableProjection(exprs, child.output)()
}
}
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
child.execute().mapPartitions { iter =>
// TODO Move out projection objects creation and transfer to
// workers via closure. However we can't assume the Projection
// is serializable because of the code gen, so we have to
// create the projections within each of the partition processing.
val groups = projections.map(ee => newProjection(ee, child.output)).toArray
val groups = projections.map(projection).toArray
new Iterator[InternalRow] {
private[this] var result: InternalRow = _
private[this] var idx = -1 // -1 means the initial state
......
......@@ -67,16 +67,10 @@ case class TungstenProject(projectList: Seq[NamedExpression], child: SparkPlan)
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
/** Rewrite the project list to use unsafe expressions as needed. */
protected val unsafeProjectList = projectList.map(_ transform {
case CreateStruct(children) => CreateStructUnsafe(children)
case CreateNamedStruct(children) => CreateNamedStructUnsafe(children)
})
protected override def doExecute(): RDD[InternalRow] = {
val numRows = longMetric("numRows")
child.execute().mapPartitions { iter =>
val project = UnsafeProjection.create(unsafeProjectList, child.output)
val project = UnsafeProjection.create(projectList, child.output)
iter.map { row =>
numRows += 1
project(row)
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Alias, Literal}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.IntegerType
class ExpandSuite extends SparkPlanTest with SharedSQLContext {
import testImplicits.localSeqToDataFrameHolder
private def testExpand(f: SparkPlan => SparkPlan): Unit = {
val input = (1 to 1000).map(Tuple1.apply)
val projections = Seq.tabulate(2) { i =>
Alias(BoundReference(0, IntegerType, false), "id")() :: Alias(Literal(i), "gid")() :: Nil
}
val attributes = projections.head.map(_.toAttribute)
checkAnswer(
input.toDF(),
plan => Expand(projections, attributes, f(plan)),
input.flatMap(i => Seq.tabulate(2)(j => Row(i._1, j)))
)
}
test("inheriting child row type") {
val exprs = AttributeReference("a", IntegerType, false)() :: Nil
val plan = Expand(Seq(exprs), exprs, ConvertToUnsafe(LocalTableScan(exprs, Seq.empty)))
assert(plan.outputsUnsafeRows, "Expand should inherits the created row type from its child.")
}
test("expanding UnsafeRows") {
testExpand(ConvertToUnsafe)
}
test("expanding SafeRows") {
testExpand(identity)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment