Skip to content
Snippets Groups Projects
Commit e01c6c86 authored by gatorsmile's avatar gatorsmile Committed by Michael Armbrust
Browse files

[SPARK-12287][SQL] Support UnsafeRow in MapPartitions/MapGroups/CoGroup

Support Unsafe Row in MapPartitions/MapGroups/CoGroup.

Added a test case for MapPartitions. Since MapGroups and CoGroup are built on AppendColumns, all the related dataset test cases already can verify the correctness when MapGroups and CoGroup processing unsafe rows.

davies cloud-fan Not sure if my understanding is right, please correct me. Thank you!

Author: gatorsmile <gatorsmile@gmail.com>

Closes #10398 from gatorsmile/unsafeRowMapGroup.
parent 73b70f07
No related branches found
No related tags found
No related merge requests found
...@@ -370,6 +370,10 @@ case class MapPartitions[T, U]( ...@@ -370,6 +370,10 @@ case class MapPartitions[T, U](
output: Seq[Attribute], output: Seq[Attribute],
child: SparkPlan) extends UnaryNode { child: SparkPlan) extends UnaryNode {
override def canProcessSafeRows: Boolean = true
override def canProcessUnsafeRows: Boolean = true
override def outputsUnsafeRows: Boolean = true
override protected def doExecute(): RDD[InternalRow] = { override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitionsInternal { iter => child.execute().mapPartitionsInternal { iter =>
val tBoundEncoder = tEncoder.bind(child.output) val tBoundEncoder = tEncoder.bind(child.output)
...@@ -391,6 +395,7 @@ case class AppendColumns[T, U]( ...@@ -391,6 +395,7 @@ case class AppendColumns[T, U](
// We are using an unsafe combiner. // We are using an unsafe combiner.
override def canProcessSafeRows: Boolean = false override def canProcessSafeRows: Boolean = false
override def canProcessUnsafeRows: Boolean = true override def canProcessUnsafeRows: Boolean = true
override def outputsUnsafeRows: Boolean = true
override def output: Seq[Attribute] = child.output ++ newColumns override def output: Seq[Attribute] = child.output ++ newColumns
...@@ -420,6 +425,10 @@ case class MapGroups[K, T, U]( ...@@ -420,6 +425,10 @@ case class MapGroups[K, T, U](
output: Seq[Attribute], output: Seq[Attribute],
child: SparkPlan) extends UnaryNode { child: SparkPlan) extends UnaryNode {
override def canProcessSafeRows: Boolean = true
override def canProcessUnsafeRows: Boolean = true
override def outputsUnsafeRows: Boolean = true
override def requiredChildDistribution: Seq[Distribution] = override def requiredChildDistribution: Seq[Distribution] =
ClusteredDistribution(groupingAttributes) :: Nil ClusteredDistribution(groupingAttributes) :: Nil
...@@ -459,6 +468,10 @@ case class CoGroup[Key, Left, Right, Result]( ...@@ -459,6 +468,10 @@ case class CoGroup[Key, Left, Right, Result](
left: SparkPlan, left: SparkPlan,
right: SparkPlan) extends BinaryNode { right: SparkPlan) extends BinaryNode {
override def canProcessSafeRows: Boolean = true
override def canProcessUnsafeRows: Boolean = true
override def outputsUnsafeRows: Boolean = true
override def requiredChildDistribution: Seq[Distribution] = override def requiredChildDistribution: Seq[Distribution] =
ClusteredDistribution(leftGroup) :: ClusteredDistribution(rightGroup) :: Nil ClusteredDistribution(leftGroup) :: ClusteredDistribution(rightGroup) :: Nil
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment