Skip to content
Snippets Groups Projects
Commit aeddeafc authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-9667][SQL] followup: Use GenerateUnsafeProjection.canSupport to test...

[SPARK-9667][SQL] followup: Use GenerateUnsafeProjection.canSupport to test Exchange supported data types.

This way we recursively test the data types.

cc chenghao-intel

Author: Reynold Xin <rxin@databricks.com>

Closes #8036 from rxin/cansupport and squashes the following commits:

f7302ff [Reynold Xin] Can GenerateUnsafeProjection.canSupport to test Exchange supported data types.
parent 9897cc5e
No related branches found
No related tags found
No related merge requests found
......@@ -27,9 +27,9 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types.UserDefinedType
import org.apache.spark.util.MutablePair
import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEnv}
......@@ -43,18 +43,11 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
override def nodeName: String = if (tungstenMode) "TungstenExchange" else "Exchange"
/**
* Returns true iff the children outputs aggregate UDTs that are not part of the SQL type.
* This only happens with the old aggregate implementation and should be removed in 1.6.
* Returns true iff we can support the data type, and we are not doing range partitioning.
*/
private lazy val tungstenMode: Boolean = {
val unserializableUDT = child.schema.exists(_.dataType match {
case _: UserDefinedType[_] => true
case _ => false
})
// Do not use the Unsafe path if we are using a RangePartitioning, since this may lead to
// an interpreted RowOrdering being applied to an UnsafeRow, which will lead to
// ClassCastExceptions at runtime. This check can be removed after SPARK-9054 is fixed.
!unserializableUDT && !newPartitioning.isInstanceOf[RangePartitioning]
GenerateUnsafeProjection.canSupport(child.schema) &&
!newPartitioning.isInstanceOf[RangePartitioning]
}
override def outputPartitioning: Partitioning = newPartitioning
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment