diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 08ae06e865e3a91c20f6b6fc9c4dec9bdadf24fb..d3e206b3537f15596b114c605fd8b789874060d4 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -438,7 +438,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( val res = self.context.runJob(self, process _, Array(index), false) res(0) case None => - throw new UnsupportedOperationException("lookup() called on an RDD without a partitioner") + self.filter(_._1 == key).map(_._2).collect } } diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java index 46a0b68f890de7bcccef9c110494244100c2ba1a..33d5fc2d890b66137d7fe4975ac5ca00f0e78a9e 100644 --- a/core/src/test/scala/spark/JavaAPISuite.java +++ b/core/src/test/scala/spark/JavaAPISuite.java @@ -130,6 +130,17 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(2, foreachCalls); } + @Test + public void lookup() { + JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList( + new Tuple2<String, String>("Apples", "Fruit"), + new Tuple2<String, String>("Oranges", "Fruit"), + new Tuple2<String, String>("Oranges", "Citrus") + )); + Assert.assertEquals(2, categories.lookup("Oranges").size()); + Assert.assertEquals(2, categories.groupByKey().lookup("Oranges").get(0).size()); + } + @Test public void groupBy() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));