Skip to content
Snippets Groups Projects
Commit 932b24fd authored by Yin Huai's avatar Yin Huai Committed by Reynold Xin
Browse files

[SPARK-9949] [SQL] Fix TakeOrderedAndProject's output.

https://issues.apache.org/jira/browse/SPARK-9949

Author: Yin Huai <yhuai@databricks.com>

Closes #8179 from yhuai/SPARK-9949.
parent 18a761ef
No related branches found
No related tags found
No related merge requests found
......@@ -237,7 +237,10 @@ case class TakeOrderedAndProject(
projectList: Option[Seq[NamedExpression]],
child: SparkPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
override def output: Seq[Attribute] = {
val projectOutput = projectList.map(_.map(_.toAttribute))
projectOutput.getOrElse(child.output)
}
override def outputPartitioning: Partitioning = SinglePartition
......@@ -263,6 +266,13 @@ case class TakeOrderedAndProject(
protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(collectData(), 1)
override def outputOrdering: Seq[SortOrder] = sortOrder
override def simpleString: String = {
val orderByString = sortOrder.mkString("[", ",", "]")
val outputString = output.mkString("[", ",", "]")
s"TakeOrderedAndProject(limit=$limit, orderBy=$orderByString, output=$outputString)"
}
}
/**
......
......@@ -162,9 +162,23 @@ class PlannerSuite extends SparkFunSuite with SharedSQLContext {
}
test("efficient limit -> project -> sort") {
val query = testData.sort('key).select('value).limit(2).logicalPlan
val planned = ctx.planner.TakeOrderedAndProject(query)
assert(planned.head.isInstanceOf[execution.TakeOrderedAndProject])
{
val query =
testData.select('key, 'value).sort('key).limit(2).logicalPlan
val planned = ctx.planner.TakeOrderedAndProject(query)
assert(planned.head.isInstanceOf[execution.TakeOrderedAndProject])
assert(planned.head.output === testData.select('key, 'value).logicalPlan.output)
}
{
// We need to make sure TakeOrderedAndProject's output is correct when we push a project
// into it.
val query =
testData.select('key, 'value).sort('key).select('value, 'key).limit(2).logicalPlan
val planned = ctx.planner.TakeOrderedAndProject(query)
assert(planned.head.isInstanceOf[execution.TakeOrderedAndProject])
assert(planned.head.output === testData.select('value, 'key).logicalPlan.output)
}
}
test("PartitioningCollection") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment