From 297c20226d3330309c9165d789749458f8f4ab8e Mon Sep 17 00:00:00 2001 From: Reynold Xin <rxin@databricks.com> Date: Tue, 22 Mar 2016 11:37:37 -0700 Subject: [PATCH] [SPARK-14063][SQL] SQLContext.range should return Dataset[java.lang.Long] ## What changes were proposed in this pull request? This patch changed the return type for SQLContext.range from `Dataset[Long]` (Scala primitive) to `Dataset[java.lang.Long]` (Java boxed long). Previously, SPARK-13894 changed the return type of range from `Dataset[Row]` to `Dataset[Long]`. The problem is that due to https://issues.scala-lang.org/browse/SI-4388, Scala compiles primitive types in generics into just Object, i.e. range at bytecode level now just returns `Dataset[Object]`. This is really bad for Java users because they are losing type safety and also need to add a type cast every time they use range. Talked to Jason Zaugg from Lightbend (Typesafe) who suggested the best approach is to return `Dataset[java.lang.Long]`. The downside is that when Scala users want to explicitly type a closure used on the dataset returned by range, they would need to use `java.lang.Long` instead of the Scala `Long`. ## How was this patch tested? The signature change should be covered by existing unit tests and API tests. I also added a new test case in DatasetSuite for range. Author: Reynold Xin <rxin@databricks.com> Closes #11880 from rxin/SPARK-14063. --- .../main/scala/org/apache/spark/sql/SQLContext.scala | 10 +++++----- .../test/org/apache/spark/sql/JavaDataFrameSuite.java | 4 ++-- .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 9 +++++++++ 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index d562f55e9f..efaccec262 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -725,7 +725,7 @@ class SQLContext private[sql]( * @group dataset */ @Experimental - def range(end: Long): Dataset[Long] = range(0, end) + def range(end: Long): Dataset[java.lang.Long] = range(0, end) /** * :: Experimental :: @@ -736,7 +736,7 @@ class SQLContext private[sql]( * @group dataset */ @Experimental - def range(start: Long, end: Long): Dataset[Long] = { + def range(start: Long, end: Long): Dataset[java.lang.Long] = { range(start, end, step = 1, numPartitions = sparkContext.defaultParallelism) } @@ -749,7 +749,7 @@ class SQLContext private[sql]( * @group dataset */ @Experimental - def range(start: Long, end: Long, step: Long): Dataset[Long] = { + def range(start: Long, end: Long, step: Long): Dataset[java.lang.Long] = { range(start, end, step, numPartitions = sparkContext.defaultParallelism) } @@ -763,8 +763,8 @@ class SQLContext private[sql]( * @group dataset */ @Experimental - def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[Long] = { - new Dataset(this, Range(start, end, step, numPartitions), implicits.newLongEncoder) + def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long] = { + new Dataset(this, Range(start, end, step, numPartitions), Encoders.LONG) } /** diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index cf764c645f..10ee7d57c7 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -329,7 +329,7 @@ public class JavaDataFrameSuite { @Test public void testCountMinSketch() { - Dataset df = context.range(1000); + Dataset<Long> df = context.range(1000); CountMinSketch sketch1 = df.stat().countMinSketch("id", 10, 20, 42); Assert.assertEquals(sketch1.totalCount(), 1000); @@ -354,7 +354,7 @@ public class JavaDataFrameSuite { @Test public void testBloomFilter() { - Dataset df = context.range(1000); + Dataset<Long> df = context.range(1000); BloomFilter filter1 = df.stat().bloomFilter("id", 1000, 0.03); Assert.assertTrue(filter1.expectedFpp() - 0.03 < 1e-3); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 04d3a25fcb..677f84eb60 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -44,6 +44,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext { 1, 1, 1) } + test("range") { + assert(sqlContext.range(10).map(_ + 1).reduce(_ + _) == 55) + assert(sqlContext.range(10).map{ case i: java.lang.Long => i + 1 }.reduce(_ + _) == 55) + assert(sqlContext.range(0, 10).map(_ + 1).reduce(_ + _) == 55) + assert(sqlContext.range(0, 10).map{ case i: java.lang.Long => i + 1 }.reduce(_ + _) == 55) + assert(sqlContext.range(0, 10, 1, 2).map(_ + 1).reduce(_ + _) == 55) + assert(sqlContext.range(0, 10, 1, 2).map{ case i: java.lang.Long => i + 1 }.reduce(_ + _) == 55) + } + test("SPARK-12404: Datatype Helper Serializability") { val ds = sparkContext.parallelize(( new Timestamp(0), -- GitLab