Skip to content
Snippets Groups Projects
Commit a83accfc authored by Shixiong Zhu's avatar Shixiong Zhu Committed by Wenchen Fan
Browse files

[SPARK-19065][SQL] Don't inherit expression id in dropDuplicates

## What changes were proposed in this pull request?

`dropDuplicates` will create an Alias using the same exprId, so `StreamExecution` should also replace Alias if necessary.

## How was this patch tested?

test("SPARK-19065: dropDuplicates should not create expressions using the same id")

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16564 from zsxwing/SPARK-19065.
parent 20e62806
No related branches found
No related tags found
No related merge requests found
...@@ -2003,10 +2003,7 @@ class Dataset[T] private[sql]( ...@@ -2003,10 +2003,7 @@ class Dataset[T] private[sql](
if (groupColExprIds.contains(attr.exprId)) { if (groupColExprIds.contains(attr.exprId)) {
attr attr
} else { } else {
// Removing duplicate rows should not change output attributes. We should keep Alias(new First(attr).toAggregateExpression(), attr.name)()
// the original exprId of the attribute. Otherwise, to select a column in original
// dataset will cause analysis exception due to unresolved attribute.
Alias(new First(attr).toAggregateExpression(), attr.name)(exprId = attr.exprId)
} }
} }
Aggregate(groupCols, aggCols, logicalPlan) Aggregate(groupCols, aggCols, logicalPlan)
......
...@@ -898,13 +898,6 @@ class DatasetSuite extends QueryTest with SharedSQLContext { ...@@ -898,13 +898,6 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
(1, 2), (1, 1), (2, 1), (2, 2)) (1, 2), (1, 1), (2, 1), (2, 2))
} }
test("dropDuplicates should not change child plan output") {
val ds = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS()
checkDataset(
ds.dropDuplicates("_1").select(ds("_1").as[String], ds("_2").as[Int]),
("a", 1), ("b", 1))
}
test("SPARK-16097: Encoders.tuple should handle null object correctly") { test("SPARK-16097: Encoders.tuple should handle null object correctly") {
val enc = Encoders.tuple(Encoders.tuple(Encoders.STRING, Encoders.STRING), Encoders.STRING) val enc = Encoders.tuple(Encoders.tuple(Encoders.STRING, Encoders.STRING), Encoders.STRING)
val data = Seq((("a", "b"), "c"), (null, "d")) val data = Seq((("a", "b"), "c"), (null, "d"))
......
...@@ -304,6 +304,32 @@ class StreamSuite extends StreamTest { ...@@ -304,6 +304,32 @@ class StreamSuite extends StreamTest {
q.stop() q.stop()
} }
} }
test("SPARK-19065: dropDuplicates should not create expressions using the same id") {
withTempPath { testPath =>
val data = Seq((1, 2), (2, 3), (3, 4))
data.toDS.write.mode("overwrite").json(testPath.getCanonicalPath)
val schema = spark.read.json(testPath.getCanonicalPath).schema
val query = spark
.readStream
.schema(schema)
.json(testPath.getCanonicalPath)
.dropDuplicates("_1")
.writeStream
.format("memory")
.queryName("testquery")
.outputMode("complete")
.start()
try {
query.processAllAvailable()
if (query.exception.isDefined) {
throw query.exception.get
}
} finally {
query.stop()
}
}
}
} }
/** /**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment