-
- Downloads
[SPARK-14393][SQL] values generated by non-deterministic functions shouldn't...
[SPARK-14393][SQL] values generated by non-deterministic functions shouldn't change after coalesce or union ## What changes were proposed in this pull request? When a user appended a column using a "nondeterministic" function to a DataFrame, e.g., `rand`, `randn`, and `monotonically_increasing_id`, the expected semantic is the following: - The value in each row should remain unchanged, as if we materialize the column immediately, regardless of later DataFrame operations. However, since we use `TaskContext.getPartitionId` to get the partition index from the current thread, the values from nondeterministic columns might change if we call `union` or `coalesce` after. `TaskContext.getPartitionId` returns the partition index of the current Spark task, which might not be the corresponding partition index of the DataFrame where we defined the column. See the unit tests below or JIRA for examples. This PR uses the partition index from `RDD.mapPartitionWithIndex` instead of `TaskContext` and fixes the partition initialization logic in whole-stage codegen, normal codegen, and codegen fallback. `initializeStatesForPartition(partitionIndex: Int)` was added to `Projection`, `Nondeterministic`, and `Predicate` (codegen) and initialized right after object creation in `mapPartitionWithIndex`. `newPredicate` now returns a `Predicate` instance rather than a function for proper initialization. ## How was this patch tested? Unit tests. (Actually I'm not very confident that this PR fixed all issues without introducing new ones ...) cc: rxin davies Author: Xiangrui Meng <meng@databricks.com> Closes #15567 from mengxr/SPARK-14393. (cherry picked from commit 02f20310) Signed-off-by:Reynold Xin <rxin@databricks.com>
Showing
- core/src/main/scala/org/apache/spark/rdd/RDD.scala 14 additions, 2 deletionscore/src/main/scala/org/apache/spark/rdd/RDD.scala
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala 15 additions, 4 deletions...rg/apache/spark/sql/catalyst/expressions/Expression.scala
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala 1 addition, 1 deletion...apache/spark/sql/catalyst/expressions/InputFileName.scala
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala 6 additions, 5 deletions.../sql/catalyst/expressions/MonotonicallyIncreasingID.scala
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala 14 additions, 8 deletions...rg/apache/spark/sql/catalyst/expressions/Projection.scala
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala 6 additions, 7 deletions...che/spark/sql/catalyst/expressions/SparkPartitionID.scala
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala 14 additions, 0 deletions...park/sql/catalyst/expressions/codegen/CodeGenerator.scala
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala 13 additions, 5 deletions...rk/sql/catalyst/expressions/codegen/CodegenFallback.scala
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala 4 additions, 0 deletions...alyst/expressions/codegen/GenerateMutableProjection.scala
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala 14 additions, 4 deletions.../sql/catalyst/expressions/codegen/GeneratePredicate.scala
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala 4 additions, 0 deletions...catalyst/expressions/codegen/GenerateSafeProjection.scala
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala 4 additions, 0 deletions...talyst/expressions/codegen/GenerateUnsafeProjection.scala
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala 9 additions, 1 deletion...a/org/apache/spark/sql/catalyst/expressions/package.scala
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala 0 additions, 4 deletions...rg/apache/spark/sql/catalyst/expressions/predicates.scala
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala 8 additions, 6 deletions...he/spark/sql/catalyst/expressions/randomExpressions.scala
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala 1 addition, 0 deletions...a/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
- sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala 4 additions, 1 deletion...spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
- sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenExpressionCachingSuite.scala 8 additions, 5 deletions...t/expressions/codegen/CodegenExpressionCachingSuite.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 4 additions, 2 deletions...a/org/apache/spark/sql/execution/DataSourceScanExec.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 2 additions, 1 deletion...in/scala/org/apache/spark/sql/execution/ExistingRDD.scala
Loading
Please register or sign in to comment