From c183b92c3c70ad2d36a2d60bdb10c02b65bc0212 Mon Sep 17 00:00:00 2001 From: bpaulin <bob@bobpaulin.com> Date: Sat, 26 Jul 2014 10:27:09 -0700 Subject: [PATCH] [SPARK-2279] Added emptyRDD method to Java API Added emptyRDD method to Java API with tests. Author: bpaulin <bob@bobpaulin.com> Closes #1597 from bobpaulin/SPARK-2279 and squashes the following commits: 5ad57c2 [bpaulin] [SPARK-2279] Added emptyRDD method to Java API --- .../org/apache/spark/api/java/JavaSparkContext.scala | 9 ++++++++- core/src/test/java/org/apache/spark/JavaAPISuite.java | 9 +++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index a678355a1c..8a5f8088a0 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -34,7 +34,7 @@ import org.apache.spark._ import org.apache.spark.SparkContext.{DoubleAccumulatorParam, IntAccumulatorParam} import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.broadcast.Broadcast -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{EmptyRDD, RDD} /** * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns @@ -135,6 +135,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices) } + /** Get an RDD that has no partitions or elements. */ + def emptyRDD[T]: JavaRDD[T] = { + implicit val ctag: ClassTag[T] = fakeClassTag + JavaRDD.fromRDD(new EmptyRDD[T](sc)) + } + + /** Distribute a local Scala collection to form an RDD. */ def parallelize[T](list: java.util.List[T]): JavaRDD[T] = parallelize(list, sc.defaultParallelism) diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index b2868b59ce..f882a8623f 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -118,8 +118,7 @@ public class JavaAPISuite implements Serializable { JavaRDD<Integer> intersections = s1.intersection(s2); Assert.assertEquals(3, intersections.count()); - List<Integer> list = new ArrayList<Integer>(); - JavaRDD<Integer> empty = sc.parallelize(list); + JavaRDD<Integer> empty = sc.emptyRDD(); JavaRDD<Integer> emptyIntersection = empty.intersection(s2); Assert.assertEquals(0, emptyIntersection.count()); @@ -184,6 +183,12 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2)); } + @Test + public void emptyRDD() { + JavaRDD<String> rdd = sc.emptyRDD(); + Assert.assertEquals("Empty RDD shouldn't have any values", 0, rdd.count()); + } + @Test public void sortBy() { List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>(); -- GitLab