Skip to content
Snippets Groups Projects
Commit 7341de0d authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #475 from JoshRosen/spark-668

Remove hack workaround for SPARK-668
parents fcf7fa84 e9fb2542
No related branches found
No related tags found
No related merge requests found
...@@ -12,7 +12,7 @@ import spark.storage.StorageLevel ...@@ -12,7 +12,7 @@ import spark.storage.StorageLevel
import com.google.common.base.Optional import com.google.common.base.Optional
trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround[T] { trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def wrapRDD(rdd: RDD[T]): This def wrapRDD(rdd: RDD[T]): This
implicit val classManifest: ClassManifest[T] implicit val classManifest: ClassManifest[T]
...@@ -82,12 +82,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround ...@@ -82,12 +82,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround
} }
/** /**
* Part of the workaround for SPARK-668; called in PairFlatMapWorkaround.java. * Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/ */
private[spark] def doFlatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairRDD[K, V] = { def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala def fn = (x: T) => f.apply(x).asScala
def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType()) JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType())
} }
...@@ -110,8 +111,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround ...@@ -110,8 +111,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround
/** /**
* Return a new RDD by applying a function to each partition of this RDD. * Return a new RDD by applying a function to each partition of this RDD.
*/ */
def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V]): def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
JavaPairRDD[K, V] = { JavaPairRDD[K2, V2] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType()) JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType())
} }
......
package spark.api.java;
import spark.api.java.JavaPairRDD;
import spark.api.java.JavaRDDLike;
import spark.api.java.function.PairFlatMapFunction;
import java.io.Serializable;
/**
* Workaround for SPARK-668.
*/
class PairFlatMapWorkaround<T> implements Serializable {
/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
public <K, V> JavaPairRDD<K, V> flatMap(PairFlatMapFunction<T, K, V> f) {
return ((JavaRDDLike <T, ?>) this).doFlatMap(f);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment