Skip to content
Snippets Groups Projects
Commit 743a889d authored by Saldanha's avatar Saldanha Committed by Josh Rosen
Browse files

[SPARK-4459] Change groupBy type parameter from K to U

Please see https://issues.apache.org/jira/browse/SPARK-4459

Author: Saldanha <saldaal1@phusca-l24858.wlan.na.novartis.net>

Closes #3327 from alokito/master and squashes the following commits:

54b1095 [Saldanha] [SPARK-4459] changed type parameter for keyBy from K to U
d5f73c3 [Saldanha] [SPARK-4459] added keyBy test
316ad77 [Saldanha] SPARK-4459 changed type parameter for groupBy from K to U.
62ddd4b [Saldanha] SPARK-4459 added failing unit test
parent 794f3aec
No related branches found
No related tags found
No related merge requests found
...@@ -211,8 +211,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { ...@@ -211,8 +211,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key. * mapping to that key.
*/ */
def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JIterable[T]] = { def groupBy[U](f: JFunction[T, U]): JavaPairRDD[U, JIterable[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag // The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
implicit val ctagK: ClassTag[U] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag))) JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag)))
} }
...@@ -221,10 +222,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { ...@@ -221,10 +222,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key. * mapping to that key.
*/ */
def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JIterable[T]] = { def groupBy[U](f: JFunction[T, U], numPartitions: Int): JavaPairRDD[U, JIterable[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag // The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
implicit val ctagK: ClassTag[U] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[K]))) JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[U])))
} }
/** /**
...@@ -458,8 +460,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { ...@@ -458,8 +460,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/** /**
* Creates tuples of the elements in this RDD by applying `f`. * Creates tuples of the elements in this RDD by applying `f`.
*/ */
def keyBy[K](f: JFunction[T, K]): JavaPairRDD[K, T] = { def keyBy[U](f: JFunction[T, U]): JavaPairRDD[U, T] = {
implicit val ctag: ClassTag[K] = fakeClassTag // The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
implicit val ctag: ClassTag[U] = fakeClassTag
JavaPairRDD.fromRDD(rdd.keyBy(f)) JavaPairRDD.fromRDD(rdd.keyBy(f))
} }
......
...@@ -323,6 +323,47 @@ public class JavaAPISuite implements Serializable { ...@@ -323,6 +323,47 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
} }
@Test
public void groupByOnPairRDD() {
// Regression test for SPARK-4459
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Function<Tuple2<Integer, Integer>, Boolean> areOdd =
new Function<Tuple2<Integer, Integer>, Boolean>() {
@Override
public Boolean call(Tuple2<Integer, Integer> x) {
return (x._1() % 2 == 0) && (x._2() % 2 == 0);
}
};
JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
JavaPairRDD<Boolean, Iterable<Tuple2<Integer, Integer>>> oddsAndEvens = pairRDD.groupBy(areOdd);
Assert.assertEquals(2, oddsAndEvens.count());
Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
oddsAndEvens = pairRDD.groupBy(areOdd, 1);
Assert.assertEquals(2, oddsAndEvens.count());
Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
}
@SuppressWarnings("unchecked")
@Test
public void keyByOnPairRDD() {
// Regression test for SPARK-4459
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Function<Tuple2<Integer, Integer>, String> sumToString =
new Function<Tuple2<Integer, Integer>, String>() {
@Override
public String call(Tuple2<Integer, Integer> x) {
return String.valueOf(x._1() + x._2());
}
};
JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
JavaPairRDD<String, Tuple2<Integer, Integer>> keyed = pairRDD.keyBy(sumToString);
Assert.assertEquals(7, keyed.count());
Assert.assertEquals(1, (long) keyed.lookup("2").get(0)._1());
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
public void cogroup() { public void cogroup() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment