Skip to content
Snippets Groups Projects
Commit b51d733a authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Fixed Java union methods having same erasure.

Changed union() methods on lists to take a separate "first element"
argument in order to differentiate them to the compiler, because Java 7
considered it an error to have them all take Lists parameterized with
different types.
parent 5c5aa2ff
No related branches found
No related tags found
No related merge requests found
......@@ -174,22 +174,23 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
}
override def union[T](jrdds: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
val rdds: Seq[RDD[T]] = asScalaBuffer(jrdds).map(_.rdd)
implicit val cm: ClassManifest[T] = jrdds.head.classManifest
override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
implicit val cm: ClassManifest[T] = first.classManifest
sc.union(rdds: _*)(cm)
}
override def union[K, V](jrdds: java.util.List[JavaPairRDD[K, V]]): JavaPairRDD[K, V] = {
val rdds: Seq[RDD[(K, V)]] = asScalaBuffer(jrdds).map(_.rdd)
implicit val cm: ClassManifest[(K, V)] = jrdds.head.classManifest
implicit val kcm: ClassManifest[K] = jrdds.head.kManifest
implicit val vcm: ClassManifest[V] = jrdds.head.vManifest
override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]])
: JavaPairRDD[K, V] = {
val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
implicit val cm: ClassManifest[(K, V)] = first.classManifest
implicit val kcm: ClassManifest[K] = first.kManifest
implicit val vcm: ClassManifest[V] = first.vManifest
new JavaPairRDD(sc.union(rdds: _*)(cm))(kcm, vcm)
}
override def union(jrdds: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = {
val rdds: Seq[RDD[Double]] = asScalaBuffer(jrdds).map(_.srdd)
override def union(first: JavaDoubleRDD, rest: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = {
val rdds: Seq[RDD[Double]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.srdd)
new JavaDoubleRDD(sc.union(rdds: _*))
}
......@@ -215,4 +216,4 @@ object JavaSparkContext {
implicit def fromSparkContext(sc: SparkContext): JavaSparkContext = new JavaSparkContext(sc)
implicit def toSparkContext(jsc: JavaSparkContext): SparkContext = jsc.sc
}
\ No newline at end of file
}
package spark.api.java;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
// See
// http://scala-programming-language.1934581.n4.nabble.com/Workaround-for-implementing-java-varargs-in-2-7-2-final-tp1944767p1944772.html
abstract class JavaSparkContextVarargsWorkaround {
public <T> JavaRDD<T> union(JavaRDD<T> ... rdds) {
return union(Arrays.asList(rdds));
public <T> JavaRDD<T> union(JavaRDD<T>... rdds) {
if (rdds.length == 0) {
throw new IllegalArgumentException("Union called on empty list");
}
ArrayList<JavaRDD<T>> rest = new ArrayList<JavaRDD<T>>(rdds.length - 1);
for (int i = 1; i < rdds.length; i++) {
rest.add(rdds[i]);
}
return union(rdds[0], rest);
}
public JavaDoubleRDD union(JavaDoubleRDD ... rdds) {
return union(Arrays.asList(rdds));
public JavaDoubleRDD union(JavaDoubleRDD... rdds) {
if (rdds.length == 0) {
throw new IllegalArgumentException("Union called on empty list");
}
ArrayList<JavaDoubleRDD> rest = new ArrayList<JavaDoubleRDD>(rdds.length - 1);
for (int i = 1; i < rdds.length; i++) {
rest.add(rdds[i]);
}
return union(rdds[0], rest);
}
public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> ... rdds) {
return union(Arrays.asList(rdds));
public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V>... rdds) {
if (rdds.length == 0) {
throw new IllegalArgumentException("Union called on empty list");
}
ArrayList<JavaPairRDD<K, V>> rest = new ArrayList<JavaPairRDD<K, V>>(rdds.length - 1);
for (int i = 1; i < rdds.length; i++) {
rest.add(rdds[i]);
}
return union(rdds[0], rest);
}
abstract public <T> JavaRDD<T> union(List<JavaRDD<T>> rdds);
abstract public JavaDoubleRDD union(List<JavaDoubleRDD> rdds);
abstract public <K, V> JavaPairRDD<K, V> union(List<JavaPairRDD<K, V>> rdds);
// These methods take separate "first" and "rest" elements to avoid having the same type erasure
abstract public <T> JavaRDD<T> union(JavaRDD<T> first, List<JavaRDD<T>> rest);
abstract public JavaDoubleRDD union(JavaDoubleRDD first, List<JavaDoubleRDD> rest);
abstract public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> first, List<JavaPairRDD<K, V>> rest);
}
......@@ -66,10 +66,9 @@ public class JavaAPISuite implements Serializable {
JavaRDD<String> sUnion = sc.union(s1, s2);
Assert.assertEquals(4, sUnion.count());
// List
List<JavaRDD<String>> srdds = new ArrayList<JavaRDD<String>>();
srdds.add(s1);
srdds.add(s2);
sUnion = sc.union(srdds);
List<JavaRDD<String>> list = new ArrayList<JavaRDD<String>>();
list.add(s2);
sUnion = sc.union(s1, list);
Assert.assertEquals(4, sUnion.count());
// Union of JavaDoubleRDDs
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment