diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 0f9cbe213ea17d90840ba5297679ea2683808972..6ea045198e2ced7346c14c150f7e37f4ad10fc03 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -379,6 +379,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { test("mapWith") { import java.util.Random val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2) + @deprecated("suppress compile time deprecation warning", "1.0.0") val randoms = ones.mapWith( (index: Int) => new Random(index + 42)) {(t: Int, prng: Random) => prng.nextDouble * t}.collect() @@ -397,6 +398,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { test("flatMapWith") { import java.util.Random val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2) + @deprecated("suppress compile time deprecation warning", "1.0.0") val randoms = ones.flatMapWith( (index: Int) => new Random(index + 42)) {(t: Int, prng: Random) => @@ -418,6 +420,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { test("filterWith") { import java.util.Random val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2) + @deprecated("suppress compile time deprecation warning", "1.0.0") val sample = ints.filterWith( (index: Int) => new Random(index + 42)) {(t: Int, prng: Random) => prng.nextInt(3) == 0}. diff --git a/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala b/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala index 5d15a68ac7e4fe31571b02e020fbba8a959cea3f..aad6599589420d4d2081ee79c0b53e2f3aae2a2f 100644 --- a/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala @@ -15,15 +15,12 @@ * limitations under the License. */ -package org.apache.spark.serializer; - -import java.io.NotSerializableException +package org.apache.spark.serializer import org.scalatest.FunSuite +import org.apache.spark.{SharedSparkContext, SparkException} import org.apache.spark.rdd.RDD -import org.apache.spark.SparkException -import org.apache.spark.SharedSparkContext /* A trivial (but unserializable) container for trivial functions */ class UnserializableClass { @@ -38,52 +35,50 @@ class ProactiveClosureSerializationSuite extends FunSuite with SharedSparkContex test("throws expected serialization exceptions on actions") { val (data, uc) = fixture - val ex = intercept[SparkException] { - data.map(uc.op(_)).count + data.map(uc.op(_)).count() } - assert(ex.getMessage.contains("Task not serializable")) } // There is probably a cleaner way to eliminate boilerplate here, but we're // iterating over a map from transformation names to functions that perform that // transformation on a given RDD, creating one test case for each - + for (transformation <- - Map("map" -> xmap _, "flatMap" -> xflatMap _, "filter" -> xfilter _, - "mapWith" -> xmapWith _, "mapPartitions" -> xmapPartitions _, + Map("map" -> xmap _, + "flatMap" -> xflatMap _, + "filter" -> xfilter _, + "mapPartitions" -> xmapPartitions _, "mapPartitionsWithIndex" -> xmapPartitionsWithIndex _, - "mapPartitionsWithContext" -> xmapPartitionsWithContext _, - "filterWith" -> xfilterWith _)) { + "mapPartitionsWithContext" -> xmapPartitionsWithContext _)) { val (name, xf) = transformation - + test(s"$name transformations throw proactive serialization exceptions") { val (data, uc) = fixture - val ex = intercept[SparkException] { xf(data, uc) } - assert(ex.getMessage.contains("Task not serializable"), s"RDD.$name doesn't proactively throw NotSerializableException") } } - + private def xmap(x: RDD[String], uc: UnserializableClass): RDD[String] = x.map(y=>uc.op(y)) - private def xmapWith(x: RDD[String], uc: UnserializableClass): RDD[String] = - x.mapWith(x => x.toString)((x,y)=>x + uc.op(y)) + private def xflatMap(x: RDD[String], uc: UnserializableClass): RDD[String] = x.flatMap(y=>Seq(uc.op(y))) + private def xfilter(x: RDD[String], uc: UnserializableClass): RDD[String] = x.filter(y=>uc.pred(y)) - private def xfilterWith(x: RDD[String], uc: UnserializableClass): RDD[String] = - x.filterWith(x => x.toString)((x,y)=>uc.pred(y)) + private def xmapPartitions(x: RDD[String], uc: UnserializableClass): RDD[String] = x.mapPartitions(_.map(y=>uc.op(y))) + private def xmapPartitionsWithIndex(x: RDD[String], uc: UnserializableClass): RDD[String] = x.mapPartitionsWithIndex((_, it) => it.map(y=>uc.op(y))) + private def xmapPartitionsWithContext(x: RDD[String], uc: UnserializableClass): RDD[String] = x.mapPartitionsWithContext((_, it) => it.map(y=>uc.op(y))) diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala index ca37d707b06ca126f092bc49a6a40579acc8b1e7..d2bee448d4d3b320201c46470e7f85a5a75956ef 100644 --- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala @@ -135,12 +135,11 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging { val testOutputStream = new PipedOutputStream() val testInputStream = new PipedInputStream(testOutputStream) val appender = FileAppender(testInputStream, testFile, conf) - assert(appender.isInstanceOf[ExpectedAppender]) + //assert(appender.getClass === classTag[ExpectedAppender].getClass) assert(appender.getClass.getSimpleName === classTag[ExpectedAppender].runtimeClass.getSimpleName) if (appender.isInstanceOf[RollingFileAppender]) { val rollingPolicy = appender.asInstanceOf[RollingFileAppender].rollingPolicy - rollingPolicy.isInstanceOf[ExpectedRollingPolicy] val policyParam = if (rollingPolicy.isInstanceOf[TimeBasedRollingPolicy]) { rollingPolicy.asInstanceOf[TimeBasedRollingPolicy].rolloverIntervalMillis } else { diff --git a/core/src/test/scala/org/apache/spark/util/VectorSuite.scala b/core/src/test/scala/org/apache/spark/util/VectorSuite.scala index 7006571ef0ef6d36503ba5b176212f56455e344d..794a55d61750b0199ccf10a1c0da4a232375f69c 100644 --- a/core/src/test/scala/org/apache/spark/util/VectorSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/VectorSuite.scala @@ -24,6 +24,7 @@ import org.scalatest.FunSuite /** * Tests org.apache.spark.util.Vector functionality */ +@deprecated("suppress compile time deprecation warning", "1.0.0") class VectorSuite extends FunSuite { def verifyVector(vector: Vector, expectedLength: Int) = { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index cc4a65011dd723ec62c38214115e0e6487edd928..952a74fd5f6dedea6d6bf9f22516e9c2458f88ae 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -383,7 +383,10 @@ class TestActor(port: Int) extends Actor with ActorHelper { def bytesToString(byteString: ByteString) = byteString.utf8String - override def preStart = IOManager(context.system).connect(new InetSocketAddress(port)) + override def preStart(): Unit = { + @deprecated("suppress compile time deprecation warning", "1.0.0") + val unit = IOManager(context.system).connect(new InetSocketAddress(port)) + } def receive = { case IO.Read(socket, bytes) =>