diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index cd0aea0cb3d1f18c4af0ae14821c88fa483637ef..3f672900cb90fffda417b35df4c9c50fe16645c9 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -17,35 +17,29 @@ package org.apache.spark.api.java -import java.util.{List => JList} -import java.util.Comparator +import java.util.{Comparator, List => JList} -import scala.Tuple2 import scala.collection.JavaConversions._ import scala.reflect.ClassTag import com.google.common.base.Optional +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.compress.CompressionCodec -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapred.OutputFormat +import org.apache.hadoop.mapred.{JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} -import org.apache.hadoop.conf.Configuration -import org.apache.spark.HashPartitioner -import org.apache.spark.Partitioner +import org.apache.spark.{HashPartitioner, Partitioner} import org.apache.spark.Partitioner._ import org.apache.spark.SparkContext.rddToPairRDDFunctions -import org.apache.spark.api.java.function.{Function2 => JFunction2} -import org.apache.spark.api.java.function.{Function => JFunction} -import org.apache.spark.partial.BoundedDouble -import org.apache.spark.partial.PartialResult -import org.apache.spark.rdd.RDD -import org.apache.spark.rdd.OrderedRDDFunctions +import org.apache.spark.api.java.JavaSparkContext.fakeClassTag +import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import org.apache.spark.partial.{BoundedDouble, PartialResult} +import org.apache.spark.rdd.{OrderedRDDFunctions, RDD} import org.apache.spark.storage.StorageLevel - -class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K], - implicit val vClassTag: ClassTag[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] { +class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) + (implicit val kClassTag: ClassTag[K], implicit val vClassTag: ClassTag[V]) + extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] { override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd) @@ -158,7 +152,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C], partitioner: Partitioner): JavaPairRDD[K, C] = { - implicit val cm: ClassTag[C] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]] + implicit val ctag: ClassTag[C] = fakeClassTag fromRDD(rdd.combineByKey( createCombiner, mergeValue, @@ -284,19 +278,19 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K * RDD will be <= us. */ def subtractByKey[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, V] = { - implicit val cmw: ClassTag[W] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val ctag: ClassTag[W] = fakeClassTag fromRDD(rdd.subtractByKey(other)) } /** Return an RDD with the pairs from `this` whose keys are not in `other`. */ def subtractByKey[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, V] = { - implicit val cmw: ClassTag[W] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val ctag: ClassTag[W] = fakeClassTag fromRDD(rdd.subtractByKey(other, numPartitions)) } /** Return an RDD with the pairs from `this` whose keys are not in `other`. */ def subtractByKey[W](other: JavaPairRDD[K, W], p: Partitioner): JavaPairRDD[K, V] = { - implicit val cmw: ClassTag[W] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val ctag: ClassTag[W] = fakeClassTag fromRDD(rdd.subtractByKey(other, p)) } @@ -345,7 +339,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C]): JavaPairRDD[K, C] = { - implicit val cm: ClassTag[C] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]] + implicit val ctag: ClassTag[C] = fakeClassTag fromRDD(combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(rdd))) } @@ -438,7 +432,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K * this also retains the original RDD's partitioning. */ def mapValues[U](f: JFunction[V, U]): JavaPairRDD[K, U] = { - implicit val cm: ClassTag[U] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] + implicit val ctag: ClassTag[U] = fakeClassTag fromRDD(rdd.mapValues(f)) } @@ -449,7 +443,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = { import scala.collection.JavaConverters._ def fn = (x: V) => f.apply(x).asScala - implicit val cm: ClassTag[U] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] + implicit val ctag: ClassTag[U] = fakeClassTag fromRDD(rdd.flatMapValues(fn)) } @@ -682,31 +676,35 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K } object JavaPairRDD { - def groupByResultToJava[K, T](rdd: RDD[(K, Seq[T])])(implicit kcm: ClassTag[K], - vcm: ClassTag[T]): RDD[(K, JList[T])] = - rddToPairRDDFunctions(rdd).mapValues(seqAsJavaList _) - - def cogroupResultToJava[W, K, V](rdd: RDD[(K, (Seq[V], Seq[W]))])(implicit kcm: ClassTag[K], - vcm: ClassTag[V]): RDD[(K, (JList[V], JList[W]))] = rddToPairRDDFunctions(rdd) - .mapValues((x: (Seq[V], Seq[W])) => (seqAsJavaList(x._1), seqAsJavaList(x._2))) - - def cogroupResult2ToJava[W1, W2, K, V](rdd: RDD[(K, (Seq[V], Seq[W1], - Seq[W2]))])(implicit kcm: ClassTag[K]) : RDD[(K, (JList[V], JList[W1], - JList[W2]))] = rddToPairRDDFunctions(rdd).mapValues( - (x: (Seq[V], Seq[W1], Seq[W2])) => (seqAsJavaList(x._1), - seqAsJavaList(x._2), - seqAsJavaList(x._3))) - - def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = + private[spark] + def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Seq[T])]): RDD[(K, JList[T])] = { + rddToPairRDDFunctions(rdd).mapValues(seqAsJavaList) + } + + private[spark] + def cogroupResultToJava[K: ClassTag, V, W]( + rdd: RDD[(K, (Seq[V], Seq[W]))]): RDD[(K, (JList[V], JList[W]))] = { + rddToPairRDDFunctions(rdd).mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2))) + } + + private[spark] + def cogroupResult2ToJava[K: ClassTag, V, W1, W2]( + rdd: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))]): RDD[(K, (JList[V], JList[W1], JList[W2]))] = { + rddToPairRDDFunctions(rdd) + .mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2), seqAsJavaList(x._3))) + } + + def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = { new JavaPairRDD[K, V](rdd) + } implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd /** Convert a JavaRDD of key-value pairs to JavaPairRDD. */ def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = { - implicit val cmk: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] - implicit val cmv: ClassTag[V] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] + implicit val ctagK: ClassTag[K] = fakeClassTag + implicit val ctagV: ClassTag[V] = fakeClassTag new JavaPairRDD[K, V](rdd.rdd) } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 7d48ce01cf2cc9606c786e14018ad8bacf2f7876..7b73057953552fe6bca94db0c2d3e7a7820c392f 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -24,8 +24,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.storage.StorageLevel -class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) extends -JavaRDDLike[T, JavaRDD[T]] { +class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) + extends JavaRDDLike[T, JavaRDD[T]] { override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index fcb9729c10a6d17e3eda3f45fd5bd218e7a85c65..24a9925dbd22cb5a4944eb536c2dd6de72ddffe7 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -17,7 +17,8 @@ package org.apache.spark.api.java -import java.util.{List => JList, Comparator} +import java.util.{Comparator, List => JList} + import scala.Tuple2 import scala.collection.JavaConversions._ import scala.reflect.ClassTag @@ -25,14 +26,14 @@ import scala.reflect.ClassTag import com.google.common.base.Optional import org.apache.hadoop.io.compress.CompressionCodec -import org.apache.spark.{SparkContext, Partition, TaskContext} -import org.apache.spark.rdd.RDD +import org.apache.spark.{Partition, SparkContext, TaskContext} import org.apache.spark.api.java.JavaPairRDD._ -import org.apache.spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _} -import org.apache.spark.partial.{PartialResult, BoundedDouble} +import org.apache.spark.api.java.JavaSparkContext.fakeClassTag +import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, _} +import org.apache.spark.partial.{BoundedDouble, PartialResult} +import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel - trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def wrapRDD(rdd: RDD[T]): This @@ -88,8 +89,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return a new RDD by applying a function to all elements of this RDD. */ def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = { - def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] - new JavaPairRDD(rdd.map(f)(cm))(f.keyType(), f.valueType()) + val ctag = implicitly[ClassTag[Tuple2[K2, V2]]] + new JavaPairRDD(rdd.map(f)(ctag))(f.keyType(), f.valueType()) } /** @@ -119,8 +120,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = { import scala.collection.JavaConverters._ def fn = (x: T) => f.apply(x).asScala - def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] - JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType()) + val ctag = implicitly[ClassTag[Tuple2[K2, V2]]] + JavaPairRDD.fromRDD(rdd.flatMap(fn)(ctag))(f.keyType(), f.valueType()) } /** @@ -202,10 +203,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * mapping to that key. */ def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = { - implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] - implicit val vcm: ClassTag[JList[T]] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[JList[T]]] - JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(f.returnType)))(kcm, vcm) + implicit val ctagK: ClassTag[K] = fakeClassTag + implicit val ctagV: ClassTag[JList[T]] = fakeClassTag + JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(f.returnType))) } /** @@ -213,10 +213,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * mapping to that key. */ def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = { - implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] - implicit val vcm: ClassTag[JList[T]] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[JList[T]]] - JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(f.returnType)))(kcm, vcm) + implicit val ctagK: ClassTag[K] = fakeClassTag + implicit val ctagV: ClassTag[JList[T]] = fakeClassTag + JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(f.returnType))) } /** @@ -407,7 +406,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Creates tuples of the elements in this RDD by applying `f`. */ def keyBy[K](f: JFunction[T, K]): JavaPairRDD[K, T] = { - implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] + implicit val ctag: ClassTag[K] = fakeClassTag JavaPairRDD.fromRDD(rdd.keyBy(f)) } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 22dc9c9e2ecfef936254d8c8d611463f70996263..dc26b7f621fee012736f8c52c63b6950ae4abf7c 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -23,19 +23,17 @@ import scala.collection.JavaConversions import scala.collection.JavaConversions._ import scala.reflect.ClassTag +import com.google.common.base.Optional import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.mapred.InputFormat -import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapred.{InputFormat, JobConf} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -import com.google.common.base.Optional import org.apache.spark._ -import org.apache.spark.SparkContext.IntAccumulatorParam -import org.apache.spark.SparkContext.DoubleAccumulatorParam +import org.apache.spark.SparkContext.{DoubleAccumulatorParam, IntAccumulatorParam} +import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD - /** * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns * [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones. @@ -96,7 +94,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork /** Distribute a local Scala collection to form an RDD. */ def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = { - implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + implicit val ctag: ClassTag[T] = fakeClassTag sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices) } @@ -107,8 +105,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork /** Distribute a local Scala collection to form an RDD. */ def parallelizePairs[K, V](list: java.util.List[Tuple2[K, V]], numSlices: Int) : JavaPairRDD[K, V] = { - implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] - implicit val vcm: ClassTag[V] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] + implicit val ctagK: ClassTag[K] = fakeClassTag + implicit val ctagV: ClassTag[V] = fakeClassTag JavaPairRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices)) } @@ -149,8 +147,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork valueClass: Class[V], minSplits: Int ): JavaPairRDD[K, V] = { - implicit val kcm: ClassTag[K] = ClassTag(keyClass) - implicit val vcm: ClassTag[V] = ClassTag(valueClass) + implicit val ctagK: ClassTag[K] = ClassTag(keyClass) + implicit val ctagV: ClassTag[V] = ClassTag(valueClass) new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits)) } @@ -163,8 +161,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork */ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): JavaPairRDD[K, V] = { - implicit val kcm: ClassTag[K] = ClassTag(keyClass) - implicit val vcm: ClassTag[V] = ClassTag(valueClass) + implicit val ctagK: ClassTag[K] = ClassTag(keyClass) + implicit val ctagV: ClassTag[V] = ClassTag(valueClass) new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass)) } @@ -176,8 +174,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * that there's very little effort required to save arbitrary objects. */ def objectFile[T](path: String, minSplits: Int): JavaRDD[T] = { - implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - sc.objectFile(path, minSplits)(cm) + implicit val ctag: ClassTag[T] = fakeClassTag + sc.objectFile(path, minSplits)(ctag) } /** @@ -188,8 +186,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * that there's very little effort required to save arbitrary objects. */ def objectFile[T](path: String): JavaRDD[T] = { - implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - sc.objectFile(path)(cm) + implicit val ctag: ClassTag[T] = fakeClassTag + sc.objectFile(path)(ctag) } /** @@ -209,8 +207,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork valueClass: Class[V], minSplits: Int ): JavaPairRDD[K, V] = { - implicit val kcm: ClassTag[K] = ClassTag(keyClass) - implicit val vcm: ClassTag[V] = ClassTag(valueClass) + implicit val ctagK: ClassTag[K] = ClassTag(keyClass) + implicit val ctagV: ClassTag[V] = ClassTag(valueClass) new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits)) } @@ -229,8 +227,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork keyClass: Class[K], valueClass: Class[V] ): JavaPairRDD[K, V] = { - implicit val kcm: ClassTag[K] = ClassTag(keyClass) - implicit val vcm: ClassTag[V] = ClassTag(valueClass) + implicit val ctagK: ClassTag[K] = ClassTag(keyClass) + implicit val ctagV: ClassTag[V] = ClassTag(valueClass) new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass)) } @@ -248,8 +246,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork valueClass: Class[V], minSplits: Int ): JavaPairRDD[K, V] = { - implicit val kcm: ClassTag[K] = ClassTag(keyClass) - implicit val vcm: ClassTag[V] = ClassTag(valueClass) + implicit val ctagK: ClassTag[K] = ClassTag(keyClass) + implicit val ctagV: ClassTag[V] = ClassTag(valueClass) new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)) } @@ -266,8 +264,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork keyClass: Class[K], valueClass: Class[V] ): JavaPairRDD[K, V] = { - implicit val kcm: ClassTag[K] = ClassTag(keyClass) - implicit val vcm: ClassTag[V] = ClassTag(valueClass) + implicit val ctagK: ClassTag[K] = ClassTag(keyClass) + implicit val ctagV: ClassTag[V] = ClassTag(valueClass) new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass)) } @@ -287,8 +285,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork kClass: Class[K], vClass: Class[V], conf: Configuration): JavaPairRDD[K, V] = { - implicit val kcm: ClassTag[K] = ClassTag(kClass) - implicit val vcm: ClassTag[V] = ClassTag(vClass) + implicit val ctagK: ClassTag[K] = ClassTag(kClass) + implicit val ctagV: ClassTag[V] = ClassTag(vClass) new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf)) } @@ -306,26 +304,26 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork fClass: Class[F], kClass: Class[K], vClass: Class[V]): JavaPairRDD[K, V] = { - implicit val kcm: ClassTag[K] = ClassTag(kClass) - implicit val vcm: ClassTag[V] = ClassTag(vClass) + implicit val ctagK: ClassTag[K] = ClassTag(kClass) + implicit val ctagV: ClassTag[V] = ClassTag(vClass) new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass)) } /** Build the union of two or more RDDs. */ override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = { val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd) - implicit val cm: ClassTag[T] = first.classTag - sc.union(rdds)(cm) + implicit val ctag: ClassTag[T] = first.classTag + sc.union(rdds) } /** Build the union of two or more RDDs. */ override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]]) : JavaPairRDD[K, V] = { val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd) - implicit val cm: ClassTag[(K, V)] = first.classTag - implicit val kcm: ClassTag[K] = first.kClassTag - implicit val vcm: ClassTag[V] = first.vClassTag - new JavaPairRDD(sc.union(rdds)(cm))(kcm, vcm) + implicit val ctag: ClassTag[(K, V)] = first.classTag + implicit val ctagK: ClassTag[K] = first.kClassTag + implicit val ctagV: ClassTag[V] = first.vClassTag + new JavaPairRDD(sc.union(rdds)) } /** Build the union of two or more RDDs. */ @@ -447,8 +445,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def getCheckpointDir = JavaUtils.optionToOptional(sc.getCheckpointDir) protected def checkpointFile[T](path: String): JavaRDD[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + implicit val ctag: ClassTag[T] = fakeClassTag new JavaRDD(sc.checkpointFile(path)) } @@ -535,4 +532,18 @@ object JavaSparkContext { * your driver program. */ def jarOfObject(obj: AnyRef): Array[String] = SparkContext.jarOfObject(obj).toArray + + /** + * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef]. + * + * This method is used to keep ClassTags out of the external Java API, as the Java compiler + * cannot produce them automatically. While this ClassTag-faking does please the compiler, + * it can cause problems at runtime if the Scala API relies on ClassTags for correctness. + * + * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior, just worse performance + * or security issues. For instance, an Array[AnyRef] can hold any type T, but may lose primitive + * specialization. + */ + private[spark] + def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] }