From b51d733a5783ef29077951e842882bb002a4139e Mon Sep 17 00:00:00 2001 From: Matei Zaharia <matei@eecs.berkeley.edu> Date: Fri, 27 Jul 2012 12:23:27 -0700 Subject: [PATCH] Fixed Java union methods having same erasure. Changed union() methods on lists to take a separate "first element" argument in order to differentiate them to the compiler, because Java 7 considered it an error to have them all take Lists parameterized with different types. --- .../spark/api/java/JavaSparkContext.scala | 23 +++++----- .../JavaSparkContextVarargsWorkaround.java | 42 ++++++++++++++----- core/src/test/scala/spark/JavaAPISuite.java | 7 ++-- 3 files changed, 47 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala index 00e1aff58c..2d43bfa4ef 100644 --- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala @@ -174,22 +174,23 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass)) } - override def union[T](jrdds: java.util.List[JavaRDD[T]]): JavaRDD[T] = { - val rdds: Seq[RDD[T]] = asScalaBuffer(jrdds).map(_.rdd) - implicit val cm: ClassManifest[T] = jrdds.head.classManifest + override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = { + val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd) + implicit val cm: ClassManifest[T] = first.classManifest sc.union(rdds: _*)(cm) } - override def union[K, V](jrdds: java.util.List[JavaPairRDD[K, V]]): JavaPairRDD[K, V] = { - val rdds: Seq[RDD[(K, V)]] = asScalaBuffer(jrdds).map(_.rdd) - implicit val cm: ClassManifest[(K, V)] = jrdds.head.classManifest - implicit val kcm: ClassManifest[K] = jrdds.head.kManifest - implicit val vcm: ClassManifest[V] = jrdds.head.vManifest + override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]]) + : JavaPairRDD[K, V] = { + val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd) + implicit val cm: ClassManifest[(K, V)] = first.classManifest + implicit val kcm: ClassManifest[K] = first.kManifest + implicit val vcm: ClassManifest[V] = first.vManifest new JavaPairRDD(sc.union(rdds: _*)(cm))(kcm, vcm) } - override def union(jrdds: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = { - val rdds: Seq[RDD[Double]] = asScalaBuffer(jrdds).map(_.srdd) + override def union(first: JavaDoubleRDD, rest: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = { + val rdds: Seq[RDD[Double]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.srdd) new JavaDoubleRDD(sc.union(rdds: _*)) } @@ -215,4 +216,4 @@ object JavaSparkContext { implicit def fromSparkContext(sc: SparkContext): JavaSparkContext = new JavaSparkContext(sc) implicit def toSparkContext(jsc: JavaSparkContext): SparkContext = jsc.sc -} \ No newline at end of file +} diff --git a/core/src/main/scala/spark/api/java/JavaSparkContextVarargsWorkaround.java b/core/src/main/scala/spark/api/java/JavaSparkContextVarargsWorkaround.java index 505a090f67..97344e73da 100644 --- a/core/src/main/scala/spark/api/java/JavaSparkContextVarargsWorkaround.java +++ b/core/src/main/scala/spark/api/java/JavaSparkContextVarargsWorkaround.java @@ -1,25 +1,47 @@ package spark.api.java; import java.util.Arrays; +import java.util.ArrayList; import java.util.List; // See // http://scala-programming-language.1934581.n4.nabble.com/Workaround-for-implementing-java-varargs-in-2-7-2-final-tp1944767p1944772.html abstract class JavaSparkContextVarargsWorkaround { - public <T> JavaRDD<T> union(JavaRDD<T> ... rdds) { - return union(Arrays.asList(rdds)); + public <T> JavaRDD<T> union(JavaRDD<T>... rdds) { + if (rdds.length == 0) { + throw new IllegalArgumentException("Union called on empty list"); + } + ArrayList<JavaRDD<T>> rest = new ArrayList<JavaRDD<T>>(rdds.length - 1); + for (int i = 1; i < rdds.length; i++) { + rest.add(rdds[i]); + } + return union(rdds[0], rest); } - public JavaDoubleRDD union(JavaDoubleRDD ... rdds) { - return union(Arrays.asList(rdds)); + public JavaDoubleRDD union(JavaDoubleRDD... rdds) { + if (rdds.length == 0) { + throw new IllegalArgumentException("Union called on empty list"); + } + ArrayList<JavaDoubleRDD> rest = new ArrayList<JavaDoubleRDD>(rdds.length - 1); + for (int i = 1; i < rdds.length; i++) { + rest.add(rdds[i]); + } + return union(rdds[0], rest); } - public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> ... rdds) { - return union(Arrays.asList(rdds)); + public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V>... rdds) { + if (rdds.length == 0) { + throw new IllegalArgumentException("Union called on empty list"); + } + ArrayList<JavaPairRDD<K, V>> rest = new ArrayList<JavaPairRDD<K, V>>(rdds.length - 1); + for (int i = 1; i < rdds.length; i++) { + rest.add(rdds[i]); + } + return union(rdds[0], rest); } - abstract public <T> JavaRDD<T> union(List<JavaRDD<T>> rdds); - abstract public JavaDoubleRDD union(List<JavaDoubleRDD> rdds); - abstract public <K, V> JavaPairRDD<K, V> union(List<JavaPairRDD<K, V>> rdds); - + // These methods take separate "first" and "rest" elements to avoid having the same type erasure + abstract public <T> JavaRDD<T> union(JavaRDD<T> first, List<JavaRDD<T>> rest); + abstract public JavaDoubleRDD union(JavaDoubleRDD first, List<JavaDoubleRDD> rest); + abstract public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> first, List<JavaPairRDD<K, V>> rest); } diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java index 436a8ab0c7..5f0293e55b 100644 --- a/core/src/test/scala/spark/JavaAPISuite.java +++ b/core/src/test/scala/spark/JavaAPISuite.java @@ -66,10 +66,9 @@ public class JavaAPISuite implements Serializable { JavaRDD<String> sUnion = sc.union(s1, s2); Assert.assertEquals(4, sUnion.count()); // List - List<JavaRDD<String>> srdds = new ArrayList<JavaRDD<String>>(); - srdds.add(s1); - srdds.add(s2); - sUnion = sc.union(srdds); + List<JavaRDD<String>> list = new ArrayList<JavaRDD<String>>(); + list.add(s2); + sUnion = sc.union(s1, list); Assert.assertEquals(4, sUnion.count()); // Union of JavaDoubleRDDs -- GitLab