diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 2301caafb07ff70b7f146d40d06a4ee689f65cab..dc1e8f6c21b62e60a16647c2655113b4887d46bb 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -244,6 +244,36 @@ trait AccumulatorParam[T] extends AccumulableParam[T, T] { } } +object AccumulatorParam { + + // The following implicit objects were in SparkContext before 1.2 and users had to + // `import SparkContext._` to enable them. Now we move them here to make the compiler find + // them automatically. However, as there are duplicate codes in SparkContext for backward + // compatibility, please update them accordingly if you modify the following implicit objects. + + implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { + def addInPlace(t1: Double, t2: Double): Double = t1 + t2 + def zero(initialValue: Double) = 0.0 + } + + implicit object IntAccumulatorParam extends AccumulatorParam[Int] { + def addInPlace(t1: Int, t2: Int): Int = t1 + t2 + def zero(initialValue: Int) = 0 + } + + implicit object LongAccumulatorParam extends AccumulatorParam[Long] { + def addInPlace(t1: Long, t2: Long) = t1 + t2 + def zero(initialValue: Long) = 0L + } + + implicit object FloatAccumulatorParam extends AccumulatorParam[Float] { + def addInPlace(t1: Float, t2: Float) = t1 + t2 + def zero(initialValue: Float) = 0f + } + + // TODO: Add AccumulatorParams for other types, e.g. lists and strings +} + // TODO: The multi-thread support in accumulators is kind of lame; check // if there's a more intuitive way of doing it right private object Accumulators { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ae8bbfb56f493889452b3c88e9b8327c2d1b2aa9..586c1ccaca72be5c9943c1ebce7418d7281b0e2b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1624,47 +1624,74 @@ object SparkContext extends Logging { private[spark] val DRIVER_IDENTIFIER = "<driver>" - implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { + // The following deprecated objects have already been copied to `object AccumulatorParam` to + // make the compiler find them automatically. They are duplicate codes only for backward + // compatibility, please update `object AccumulatorParam` accordingly if you plan to modify the + // following ones. + + @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " + + "backward compatibility.", "1.2.0") + object DoubleAccumulatorParam extends AccumulatorParam[Double] { def addInPlace(t1: Double, t2: Double): Double = t1 + t2 def zero(initialValue: Double) = 0.0 } - implicit object IntAccumulatorParam extends AccumulatorParam[Int] { + @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " + + "backward compatibility.", "1.2.0") + object IntAccumulatorParam extends AccumulatorParam[Int] { def addInPlace(t1: Int, t2: Int): Int = t1 + t2 def zero(initialValue: Int) = 0 } - implicit object LongAccumulatorParam extends AccumulatorParam[Long] { + @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " + + "backward compatibility.", "1.2.0") + object LongAccumulatorParam extends AccumulatorParam[Long] { def addInPlace(t1: Long, t2: Long) = t1 + t2 def zero(initialValue: Long) = 0L } - implicit object FloatAccumulatorParam extends AccumulatorParam[Float] { + @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " + + "backward compatibility.", "1.2.0") + object FloatAccumulatorParam extends AccumulatorParam[Float] { def addInPlace(t1: Float, t2: Float) = t1 + t2 def zero(initialValue: Float) = 0f } - // TODO: Add AccumulatorParams for other types, e.g. lists and strings + // The following deprecated functions have already been moved to `object RDD` to + // make the compiler find them automatically. They are still kept here for backward compatibility + // and just call the corresponding functions in `object RDD`. - implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) + @deprecated("Replaced by implicit functions in the RDD companion object. This is " + + "kept here only for backward compatibility.", "1.2.0") + def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = { - new PairRDDFunctions(rdd) + RDD.rddToPairRDDFunctions(rdd) } - implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd) + @deprecated("Replaced by implicit functions in the RDD companion object. This is " + + "kept here only for backward compatibility.", "1.2.0") + def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = RDD.rddToAsyncRDDActions(rdd) - implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag]( + @deprecated("Replaced by implicit functions in the RDD companion object. This is " + + "kept here only for backward compatibility.", "1.2.0") + def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag]( rdd: RDD[(K, V)]) = - new SequenceFileRDDFunctions(rdd) + RDD.rddToSequenceFileRDDFunctions(rdd) - implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag]( + @deprecated("Replaced by implicit functions in the RDD companion object. This is " + + "kept here only for backward compatibility.", "1.2.0") + def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag]( rdd: RDD[(K, V)]) = - new OrderedRDDFunctions[K, V, (K, V)](rdd) + RDD.rddToOrderedRDDFunctions(rdd) - implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd) + @deprecated("Replaced by implicit functions in the RDD companion object. This is " + + "kept here only for backward compatibility.", "1.2.0") + def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = RDD.doubleRDDToDoubleRDDFunctions(rdd) - implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) = - new DoubleRDDFunctions(rdd.map(x => num.toDouble(x))) + @deprecated("Replaced by implicit functions in the RDD companion object. This is " + + "kept here only for backward compatibility.", "1.2.0") + def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) = + RDD.numericRDDToDoubleRDDFunctions(rdd) // Implicit conversions to common Writable types, for saveAsSequenceFile @@ -1690,40 +1717,49 @@ object SparkContext extends Logging { arr.map(x => anyToWritable(x)).toArray) } - // Helper objects for converting common types to Writable - private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T) - : WritableConverter[T] = { - val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]] - new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W])) - } + // The following deprecated functions have already been moved to `object WritableConverter` to + // make the compiler find them automatically. They are still kept here for backward compatibility + // and just call the corresponding functions in `object WritableConverter`. - implicit def intWritableConverter(): WritableConverter[Int] = - simpleWritableConverter[Int, IntWritable](_.get) + @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + + "backward compatibility.", "1.2.0") + def intWritableConverter(): WritableConverter[Int] = + WritableConverter.intWritableConverter() - implicit def longWritableConverter(): WritableConverter[Long] = - simpleWritableConverter[Long, LongWritable](_.get) + @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + + "backward compatibility.", "1.2.0") + def longWritableConverter(): WritableConverter[Long] = + WritableConverter.longWritableConverter() - implicit def doubleWritableConverter(): WritableConverter[Double] = - simpleWritableConverter[Double, DoubleWritable](_.get) + @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + + "backward compatibility.", "1.2.0") + def doubleWritableConverter(): WritableConverter[Double] = + WritableConverter.doubleWritableConverter() - implicit def floatWritableConverter(): WritableConverter[Float] = - simpleWritableConverter[Float, FloatWritable](_.get) + @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + + "backward compatibility.", "1.2.0") + def floatWritableConverter(): WritableConverter[Float] = + WritableConverter.floatWritableConverter() - implicit def booleanWritableConverter(): WritableConverter[Boolean] = - simpleWritableConverter[Boolean, BooleanWritable](_.get) + @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + + "backward compatibility.", "1.2.0") + def booleanWritableConverter(): WritableConverter[Boolean] = + WritableConverter.booleanWritableConverter() - implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = { - simpleWritableConverter[Array[Byte], BytesWritable](bw => - // getBytes method returns array which is longer then data to be returned - Arrays.copyOfRange(bw.getBytes, 0, bw.getLength) - ) - } + @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + + "backward compatibility.", "1.2.0") + def bytesWritableConverter(): WritableConverter[Array[Byte]] = + WritableConverter.bytesWritableConverter() - implicit def stringWritableConverter(): WritableConverter[String] = - simpleWritableConverter[String, Text](_.toString) + @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + + "backward compatibility.", "1.2.0") + def stringWritableConverter(): WritableConverter[String] = + WritableConverter.stringWritableConverter() - implicit def writableWritableConverter[T <: Writable]() = - new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T]) + @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + + "backward compatibility.", "1.2.0") + def writableWritableConverter[T <: Writable]() = + WritableConverter.writableWritableConverter() /** * Find the JAR from which a given class was loaded, to make it easy for users to pass @@ -1950,3 +1986,46 @@ private[spark] class WritableConverter[T]( val writableClass: ClassTag[T] => Class[_ <: Writable], val convert: Writable => T) extends Serializable + +object WritableConverter { + + // Helper objects for converting common types to Writable + private[spark] def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T) + : WritableConverter[T] = { + val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]] + new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W])) + } + + // The following implicit functions were in SparkContext before 1.2 and users had to + // `import SparkContext._` to enable them. Now we move them here to make the compiler find + // them automatically. However, we still keep the old functions in SparkContext for backward + // compatibility and forward to the following functions directly. + + implicit def intWritableConverter(): WritableConverter[Int] = + simpleWritableConverter[Int, IntWritable](_.get) + + implicit def longWritableConverter(): WritableConverter[Long] = + simpleWritableConverter[Long, LongWritable](_.get) + + implicit def doubleWritableConverter(): WritableConverter[Double] = + simpleWritableConverter[Double, DoubleWritable](_.get) + + implicit def floatWritableConverter(): WritableConverter[Float] = + simpleWritableConverter[Float, FloatWritable](_.get) + + implicit def booleanWritableConverter(): WritableConverter[Boolean] = + simpleWritableConverter[Boolean, BooleanWritable](_.get) + + implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = { + simpleWritableConverter[Array[Byte], BytesWritable](bw => + // getBytes method returns array which is longer then data to be returned + Arrays.copyOfRange(bw.getBytes, 0, bw.getLength) + ) + } + + implicit def stringWritableConverter(): WritableConverter[String] = + simpleWritableConverter[String, Text](_.toString) + + implicit def writableWritableConverter[T <: Writable]() = + new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T]) +} diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index e37f3acaf6e30c8b737545fba82d9d5be1833f50..7af3538262fd64efa48d166985baf55062d4f70e 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -32,13 +32,13 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} import org.apache.spark.{HashPartitioner, Partitioner} import org.apache.spark.Partitioner._ -import org.apache.spark.SparkContext.rddToPairRDDFunctions import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction} import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.rdd.{OrderedRDDFunctions, RDD} +import org.apache.spark.rdd.RDD.rddToPairRDDFunctions import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 6a6d9bf6857d3b863fe4753e6d240e2b9025ae8e..97f5c9f257e09bd2fbc43b6d4ff74ad259bb318d 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -33,7 +33,7 @@ import org.apache.hadoop.mapred.{InputFormat, JobConf} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark._ -import org.apache.spark.SparkContext._ +import org.apache.spark.AccumulatorParam._ import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.broadcast.Broadcast diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index e4025bcf48db68ce136d1d39d0668972ba436967..3add4a76192ca95a3aa0ef0b0d8bcd2298342cea 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -21,6 +21,7 @@ import java.util.{Properties, Random} import scala.collection.{mutable, Map} import scala.collection.mutable.ArrayBuffer +import scala.language.implicitConversions import scala.reflect.{classTag, ClassTag} import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus @@ -28,6 +29,7 @@ import org.apache.hadoop.io.BytesWritable import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.io.NullWritable import org.apache.hadoop.io.Text +import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.TextOutputFormat import org.apache.spark._ @@ -1383,3 +1385,31 @@ abstract class RDD[T: ClassTag]( new JavaRDD(this)(elementClassTag) } } + +object RDD { + + // The following implicit functions were in SparkContext before 1.2 and users had to + // `import SparkContext._` to enable them. Now we move them here to make the compiler find + // them automatically. However, we still keep the old functions in SparkContext for backward + // compatibility and forward to the following functions directly. + + implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) + (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = { + new PairRDDFunctions(rdd) + } + + implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd) + + implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag]( + rdd: RDD[(K, V)]) = + new SequenceFileRDDFunctions(rdd) + + implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag]( + rdd: RDD[(K, V)]) = + new OrderedRDDFunctions[K, V, (K, V)](rdd) + + implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd) + + implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) = + new DoubleRDDFunctions(rdd.map(x => num.toDouble(x))) +} diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 9e454ddcc52a6304477016ac0dab4206bee3cfba..1362022104195ffc3fc05a8952507f5628b2026c 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -82,7 +82,7 @@ class SparkContextSuite extends FunSuite with LocalSparkContext { bytesWritable.set(inputArray, 0, 10) bytesWritable.set(inputArray, 0, 5) - val converter = SparkContext.bytesWritableConverter() + val converter = WritableConverter.bytesWritableConverter() val byteArray = converter.convert(bytesWritable) assert(byteArray.length === 5) diff --git a/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala b/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..4918e2d92beb441da8caa8e032e14594c78c09f2 --- /dev/null +++ b/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sparktest + +/** + * A test suite to make sure all `implicit` functions work correctly. + * Please don't `import org.apache.spark.SparkContext._` in this class. + * + * As `implicit` is a compiler feature, we don't need to run this class. + * What we need to do is making the compiler happy. + */ +class ImplicitSuite { + + // We only want to test if `implict` works well with the compiler, so we don't need a real + // SparkContext. + def mockSparkContext[T]: org.apache.spark.SparkContext = null + + // We only want to test if `implict` works well with the compiler, so we don't need a real RDD. + def mockRDD[T]: org.apache.spark.rdd.RDD[T] = null + + def testRddToPairRDDFunctions(): Unit = { + val rdd: org.apache.spark.rdd.RDD[(Int, Int)] = mockRDD + rdd.groupByKey() + } + + def testRddToAsyncRDDActions(): Unit = { + val rdd: org.apache.spark.rdd.RDD[Int] = mockRDD + rdd.countAsync() + } + + def testRddToSequenceFileRDDFunctions(): Unit = { + // TODO eliminating `import intToIntWritable` needs refactoring SequenceFileRDDFunctions. + // That will be a breaking change. + import org.apache.spark.SparkContext.intToIntWritable + val rdd: org.apache.spark.rdd.RDD[(Int, Int)] = mockRDD + rdd.saveAsSequenceFile("/a/test/path") + } + + def testRddToOrderedRDDFunctions(): Unit = { + val rdd: org.apache.spark.rdd.RDD[(Int, Int)] = mockRDD + rdd.sortByKey() + } + + def testDoubleRDDToDoubleRDDFunctions(): Unit = { + val rdd: org.apache.spark.rdd.RDD[Double] = mockRDD + rdd.stats() + } + + def testNumericRDDToDoubleRDDFunctions(): Unit = { + val rdd: org.apache.spark.rdd.RDD[Int] = mockRDD + rdd.stats() + } + + def testDoubleAccumulatorParam(): Unit = { + val sc = mockSparkContext + sc.accumulator(123.4) + } + + def testIntAccumulatorParam(): Unit = { + val sc = mockSparkContext + sc.accumulator(123) + } + + def testLongAccumulatorParam(): Unit = { + val sc = mockSparkContext + sc.accumulator(123L) + } + + def testFloatAccumulatorParam(): Unit = { + val sc = mockSparkContext + sc.accumulator(123F) + } + + def testIntWritableConverter(): Unit = { + val sc = mockSparkContext + sc.sequenceFile[Int, Int]("/a/test/path") + } + + def testLongWritableConverter(): Unit = { + val sc = mockSparkContext + sc.sequenceFile[Long, Long]("/a/test/path") + } + + def testDoubleWritableConverter(): Unit = { + val sc = mockSparkContext + sc.sequenceFile[Double, Double]("/a/test/path") + } + + def testFloatWritableConverter(): Unit = { + val sc = mockSparkContext + sc.sequenceFile[Float, Float]("/a/test/path") + } + + def testBooleanWritableConverter(): Unit = { + val sc = mockSparkContext + sc.sequenceFile[Boolean, Boolean]("/a/test/path") + } + + def testBytesWritableConverter(): Unit = { + val sc = mockSparkContext + sc.sequenceFile[Array[Byte], Array[Byte]]("/a/test/path") + } + + def testStringWritableConverter(): Unit = { + val sc = mockSparkContext + sc.sequenceFile[String, String]("/a/test/path") + } + + def testWritableWritableConverter(): Unit = { + val sc = mockSparkContext + sc.sequenceFile[org.apache.hadoop.io.Text, org.apache.hadoop.io.Text]("/a/test/path") + } +}