Skip to content
Snippets Groups Projects
Commit 10204b9d authored by Liang-Chi Hsieh's avatar Liang-Chi Hsieh Committed by Wenchen Fan
Browse files

[SPARK-16995][SQL] TreeNodeException when flat mapping...

[SPARK-16995][SQL] TreeNodeException when flat mapping RelationalGroupedDataset created from DataFrame containing a column created with lit/expr

## What changes were proposed in this pull request?

A TreeNodeException is thrown when executing the following minimal example in Spark 2.0.

    import spark.implicits._
    case class test (x: Int, q: Int)

    val d = Seq(1).toDF("x")
    d.withColumn("q", lit(0)).as[test].groupByKey(_.x).flatMapGroups{case (x, iter) => List[Int]()}.show
    d.withColumn("q", expr("0")).as[test].groupByKey(_.x).flatMapGroups{case (x, iter) => List[Int]()}.show

The problem is at `FoldablePropagation`. The rule will do `transformExpressions` on `LogicalPlan`. The query above contains a `MapGroups` which has a parameter `dataAttributes:Seq[Attribute]`. One attributes in `dataAttributes` will be transformed to an `Alias(literal(0), _)` in `FoldablePropagation`. `Alias` is not an `Attribute` and causes the error.

We can't easily detect such type inconsistency during transforming expressions. A direct approach to this problem is to skip doing `FoldablePropagation` on object operators as they should not contain such expressions.

## How was this patch tested?

Jenkins tests.

Author: Liang-Chi Hsieh <simonh@tw.ibm.com>

Closes #14648 from viirya/flat-mapping.
parent e6bef7d5
No related branches found
No related tags found
No related merge requests found
......@@ -727,6 +727,19 @@ object FoldablePropagation extends Rule[LogicalPlan] {
case j @ Join(_, _, LeftOuter | RightOuter | FullOuter, _) =>
stop = true
j
// These 3 operators take attributes as constructor parameters, and these attributes
// can't be replaced by alias.
case m: MapGroups =>
stop = true
m
case f: FlatMapGroupsInR =>
stop = true
f
case c: CoGroup =>
stop = true
c
case p: LogicalPlan if !stop => p.transformExpressions {
case a: AttributeReference if foldableMap.contains(a) =>
foldableMap(a)
......
......@@ -878,6 +878,19 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
val ds = spark.createDataset(data)(enc)
checkDataset(ds, (("a", "b"), "c"), (null, "d"))
}
test("SPARK-16995: flat mapping on Dataset containing a column created with lit/expr") {
val df = Seq("1").toDF("a")
import df.sparkSession.implicits._
checkDataset(
df.withColumn("b", lit(0)).as[ClassData]
.groupByKey(_.a).flatMapGroups { case (x, iter) => List[Int]() })
checkDataset(
df.withColumn("b", expr("0")).as[ClassData]
.groupByKey(_.a).flatMapGroups { case (x, iter) => List[Int]() })
}
}
case class Generic[T](id: T, value: Double)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment