Skip to content
Snippets Groups Projects
Commit 04e8afe3 authored by Takuya UESHIN's avatar Takuya UESHIN Committed by Josh Rosen
Browse files

[SPARK-13357][SQL] Use generated projection and ordering for TakeOrderedAndProjectNode

`TakeOrderedAndProjectNode` should use generated projection and ordering like other `LocalNode`s.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #11230 from ueshin/issues/SPARK-13357.
parent 1e1e31e0
No related branches found
No related tags found
No related merge requests found
......@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.local
import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering
import org.apache.spark.util.BoundedPriorityQueue
case class TakeOrderedAndProjectNode(
......@@ -30,7 +31,7 @@ case class TakeOrderedAndProjectNode(
child: LocalNode) extends UnaryLocalNode(conf) {
private[this] var projection: Option[Projection] = _
private[this] var ord: InterpretedOrdering = _
private[this] var ord: Ordering[InternalRow] = _
private[this] var iterator: Iterator[InternalRow] = _
private[this] var currentRow: InternalRow = _
......@@ -41,8 +42,8 @@ case class TakeOrderedAndProjectNode(
override def open(): Unit = {
child.open()
projection = projectList.map(new InterpretedProjection(_, child.output))
ord = new InterpretedOrdering(sortOrder, child.output)
projection = projectList.map(UnsafeProjection.create(_, child.output))
ord = GenerateOrdering.generate(sortOrder, child.output)
// Priority keeps the largest elements, so let's reverse the ordering.
val queue = new BoundedPriorityQueue[InternalRow](limit)(ord.reverse)
while (child.next()) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment