Skip to content
Snippets Groups Projects
Commit a92e095e authored by Dongjoon Hyun's avatar Dongjoon Hyun Committed by Wenchen Fan
Browse files

[SPARK-21041][SQL] SparkSession.range should be consistent with SparkContext.range

## What changes were proposed in this pull request?

This PR fixes the inconsistency in `SparkSession.range`.

**BEFORE**
```scala
scala> spark.range(java.lang.Long.MAX_VALUE - 3, java.lang.Long.MIN_VALUE + 2, 1).collect
res2: Array[Long] = Array(9223372036854775804, 9223372036854775805, 9223372036854775806)
```

**AFTER**
```scala
scala> spark.range(java.lang.Long.MAX_VALUE - 3, java.lang.Long.MIN_VALUE + 2, 1).collect
res2: Array[Long] = Array()
```

## How was this patch tested?

Pass the Jenkins with newly added test cases.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #18257 from dongjoon-hyun/SPARK-21041.
parent e6eb02df
No related branches found
No related tags found
No related merge requests found
......@@ -21,7 +21,7 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration
import org.apache.spark.{InterruptibleIterator, SparkException, TaskContext}
import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD}
import org.apache.spark.rdd.{EmptyRDD, PartitionwiseSampledRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer}
......@@ -347,8 +347,12 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
}
override def inputRDDs(): Seq[RDD[InternalRow]] = {
sqlContext.sparkContext.parallelize(0 until numSlices, numSlices)
.map(i => InternalRow(i)) :: Nil
val rdd = if (start == end || (start < end ^ 0 < step)) {
new EmptyRDD[InternalRow](sqlContext.sparkContext)
} else {
sqlContext.sparkContext.parallelize(0 until numSlices, numSlices).map(i => InternalRow(i))
}
rdd :: Nil
}
protected override def doProduce(ctx: CodegenContext): String = {
......
......@@ -191,6 +191,17 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
checkAnswer(sql("SELECT * FROM range(3)"), Row(0) :: Row(1) :: Row(2) :: Nil)
}
}
test("SPARK-21041 SparkSession.range()'s behavior is inconsistent with SparkContext.range()") {
val start = java.lang.Long.MAX_VALUE - 3
val end = java.lang.Long.MIN_VALUE + 2
Seq("false", "true").foreach { value =>
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> value) {
assert(spark.range(start, end, 1).collect.length == 0)
assert(spark.range(start, start, 1).collect.length == 0)
}
}
}
}
object DataFrameRangeSuite {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment