Skip to content
Snippets Groups Projects
Commit 606f99b9 authored by gatorsmile's avatar gatorsmile Committed by Davies Liu
Browse files

[SPARK-12288] [SQL] Support UnsafeRow in Coalesce/Except/Intersect.

Support UnsafeRow for the Coalesce/Except/Intersect.

Could you review if my code changes are ok? davies Thank you!

Author: gatorsmile <gatorsmile@gmail.com>

Closes #10285 from gatorsmile/unsafeSupportCIE.
parent d13ff82c
No related branches found
No related tags found
No related merge requests found
...@@ -137,7 +137,7 @@ case class Union(children: Seq[SparkPlan]) extends SparkPlan { ...@@ -137,7 +137,7 @@ case class Union(children: Seq[SparkPlan]) extends SparkPlan {
} }
} }
} }
override def outputsUnsafeRows: Boolean = children.forall(_.outputsUnsafeRows) override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows)
override def canProcessUnsafeRows: Boolean = true override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = true override def canProcessSafeRows: Boolean = true
protected override def doExecute(): RDD[InternalRow] = protected override def doExecute(): RDD[InternalRow] =
...@@ -250,7 +250,9 @@ case class Coalesce(numPartitions: Int, child: SparkPlan) extends UnaryNode { ...@@ -250,7 +250,9 @@ case class Coalesce(numPartitions: Int, child: SparkPlan) extends UnaryNode {
child.execute().coalesce(numPartitions, shuffle = false) child.execute().coalesce(numPartitions, shuffle = false)
} }
override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
override def canProcessUnsafeRows: Boolean = true override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = true
} }
/** /**
...@@ -263,6 +265,10 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode { ...@@ -263,6 +265,10 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
protected override def doExecute(): RDD[InternalRow] = { protected override def doExecute(): RDD[InternalRow] = {
left.execute().map(_.copy()).subtract(right.execute().map(_.copy())) left.execute().map(_.copy()).subtract(right.execute().map(_.copy()))
} }
override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows)
override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = true
} }
/** /**
...@@ -275,6 +281,10 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode { ...@@ -275,6 +281,10 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
protected override def doExecute(): RDD[InternalRow] = { protected override def doExecute(): RDD[InternalRow] = {
left.execute().map(_.copy()).intersection(right.execute().map(_.copy())) left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
} }
override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows)
override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = true
} }
/** /**
......
...@@ -58,6 +58,41 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext { ...@@ -58,6 +58,41 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext {
assert(!preparedPlan.outputsUnsafeRows) assert(!preparedPlan.outputsUnsafeRows)
} }
test("coalesce can process unsafe rows") {
val plan = Coalesce(1, outputsUnsafe)
val preparedPlan = sqlContext.prepareForExecution.execute(plan)
assert(getConverters(preparedPlan).size === 1)
assert(preparedPlan.outputsUnsafeRows)
}
test("except can process unsafe rows") {
val plan = Except(outputsUnsafe, outputsUnsafe)
val preparedPlan = sqlContext.prepareForExecution.execute(plan)
assert(getConverters(preparedPlan).size === 2)
assert(preparedPlan.outputsUnsafeRows)
}
test("except requires all of its input rows' formats to agree") {
val plan = Except(outputsSafe, outputsUnsafe)
assert(plan.canProcessSafeRows && plan.canProcessUnsafeRows)
val preparedPlan = sqlContext.prepareForExecution.execute(plan)
assert(preparedPlan.outputsUnsafeRows)
}
test("intersect can process unsafe rows") {
val plan = Intersect(outputsUnsafe, outputsUnsafe)
val preparedPlan = sqlContext.prepareForExecution.execute(plan)
assert(getConverters(preparedPlan).size === 2)
assert(preparedPlan.outputsUnsafeRows)
}
test("intersect requires all of its input rows' formats to agree") {
val plan = Intersect(outputsSafe, outputsUnsafe)
assert(plan.canProcessSafeRows && plan.canProcessUnsafeRows)
val preparedPlan = sqlContext.prepareForExecution.execute(plan)
assert(preparedPlan.outputsUnsafeRows)
}
test("execute() fails an assertion if inputs rows are of different formats") { test("execute() fails an assertion if inputs rows are of different formats") {
val e = intercept[AssertionError] { val e = intercept[AssertionError] {
Union(Seq(outputsSafe, outputsUnsafe)).execute() Union(Seq(outputsSafe, outputsUnsafe)).execute()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment