From 6151d2641f91c8e3ec0c324e78afb46cdb2ef111 Mon Sep 17 00:00:00 2001 From: Sean Owen <sowen@cloudera.com> Date: Tue, 14 Jun 2016 09:40:07 -0700 Subject: [PATCH] [MINOR] Clean up several build warnings, mostly due to internal use of old accumulators ## What changes were proposed in this pull request? Another PR to clean up recent build warnings. This particularly cleans up several instances of the old accumulator API usage in tests that are straightforward to update. I think this qualifies as "minor". ## How was this patch tested? Jenkins Author: Sean Owen <sowen@cloudera.com> Closes #13642 from srowen/BuildWarnings. --- core/pom.xml | 6 +- .../spark/scheduler/DAGSchedulerSuite.scala | 12 +- .../scheduler/SchedulerIntegrationSuite.scala | 1 + .../spark/scheduler/TaskContextSuite.scala | 9 +- .../spark/sql/execution/debug/package.scala | 34 +++--- .../execution/metric/SQLMetricsSuite.scala | 105 +----------------- .../deploy/yarn/YarnAllocatorSuite.scala | 1 + 7 files changed, 32 insertions(+), 136 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index f5fdb40696..90c8f97f2b 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -356,12 +356,12 @@ <phase>generate-resources</phase> <configuration> <!-- Execute the shell script to generate the spark build information. --> - <tasks> + <target> <exec executable="${project.basedir}/../build/spark-build-info"> <arg value="${project.build.directory}/extra-resources"/> - <arg value="${pom.version}"/> + <arg value="${project.version}"/> </exec> - </tasks> + </target> </configuration> <goals> <goal>run</goal> diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index f28f429e0c..3c30ec8ee8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1602,13 +1602,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } test("misbehaved accumulator should not crash DAGScheduler and SparkContext") { - val acc = new Accumulator[Int](0, new AccumulatorParam[Int] { - override def addAccumulator(t1: Int, t2: Int): Int = t1 + t2 - override def zero(initialValue: Int): Int = 0 - override def addInPlace(r1: Int, r2: Int): Int = { - throw new DAGSchedulerSuiteDummyException - } - }) + val acc = new LongAccumulator { + override def add(v: java.lang.Long): Unit = throw new DAGSchedulerSuiteDummyException + override def add(v: Long): Unit = throw new DAGSchedulerSuiteDummyException + } + sc.register(acc) // Run this on executors sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 5271a5671a..54b7312991 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.{Await, Future} import scala.concurrent.duration.{Duration, SECONDS} +import scala.language.existentials import scala.reflect.ClassTag import org.scalactic.TripleEquals diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index 368668bc7e..9eda79ace1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -146,14 +146,13 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark test("accumulators are updated on exception failures") { // This means use 1 core and 4 max task failures sc = new SparkContext("local[1,4]", "test") - val param = AccumulatorParam.LongAccumulatorParam // Create 2 accumulators, one that counts failed values and another that doesn't - val acc1 = new Accumulator(0L, param, Some("x"), countFailedValues = true) - val acc2 = new Accumulator(0L, param, Some("y"), countFailedValues = false) + val acc1 = AccumulatorSuite.createLongAccum("x", true) + val acc2 = AccumulatorSuite.createLongAccum("y", false) // Fail first 3 attempts of every task. This means each task should be run 4 times. sc.parallelize(1 to 10, 10).map { i => - acc1 += 1 - acc2 += 1 + acc1.add(1) + acc2.add(1) if (TaskContext.get.attemptNumber() <= 2) { throw new Exception("you did something wrong") } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index f2c558ac2d..e89f792496 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution import scala.collection.mutable.HashSet -import org.apache.spark.{Accumulator, AccumulatorParam} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ @@ -28,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.util.LongAccumulator +import org.apache.spark.util.{AccumulatorV2, LongAccumulator} /** * Contains methods for debugging query execution. @@ -108,26 +107,27 @@ package object debug { private[sql] case class DebugExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport { def output: Seq[Attribute] = child.output - implicit object SetAccumulatorParam extends AccumulatorParam[HashSet[String]] { - def zero(initialValue: HashSet[String]): HashSet[String] = { - initialValue.clear() - initialValue - } - - def addInPlace(v1: HashSet[String], v2: HashSet[String]): HashSet[String] = { - v1 ++= v2 - v1 + class SetAccumulator[T] extends AccumulatorV2[T, HashSet[T]] { + private val _set = new HashSet[T]() + override def isZero: Boolean = _set.isEmpty + override def copy(): AccumulatorV2[T, HashSet[T]] = { + val newAcc = new SetAccumulator[T]() + newAcc._set ++= _set + newAcc } + override def reset(): Unit = _set.clear() + override def add(v: T): Unit = _set += v + override def merge(other: AccumulatorV2[T, HashSet[T]]): Unit = _set ++= other.value + override def value: HashSet[T] = _set } /** * A collection of metrics for each column of output. - * - * @param elementTypes the actual runtime types for the output. Useful when there are bugs - * causing the wrong data to be projected. */ - case class ColumnMetrics( - elementTypes: Accumulator[HashSet[String]] = sparkContext.accumulator(HashSet.empty)) + case class ColumnMetrics() { + val elementTypes = new SetAccumulator[String] + sparkContext.register(elementTypes) + } val tupleCount: LongAccumulator = sparkContext.longAccumulator @@ -155,7 +155,7 @@ package object debug { while (i < numColumns) { val value = currentRow.get(i, output(i).dataType) if (value != null) { - columnStats(i).elementTypes += HashSet(value.getClass.getName) + columnStats(i).elementTypes.add(value.getClass.getName) } i += 1 } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index fd956bc4ef..579a095ff0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -17,13 +17,6 @@ package org.apache.spark.sql.execution.metric -import java.io.{ByteArrayInputStream, ByteArrayOutputStream} - -import scala.collection.mutable - -import org.apache.xbean.asm5._ -import org.apache.xbean.asm5.Opcodes._ - import org.apache.spark.SparkFunSuite import org.apache.spark.sql._ import org.apache.spark.sql.execution.SparkPlanInfo @@ -31,34 +24,11 @@ import org.apache.spark.sql.execution.ui.SparkPlanGraph import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.util.{AccumulatorContext, JsonProtocol, Utils} - +import org.apache.spark.util.{AccumulatorContext, JsonProtocol} class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { import testImplicits._ - test("SQLMetric should not box Long") { - val l = SQLMetrics.createMetric(sparkContext, "long") - val f = () => { - l += 1L - l.add(1L) - } - val cl = BoxingFinder.getClassReader(f.getClass) - val boxingFinder = new BoxingFinder() - cl.accept(boxingFinder, 0) - assert(boxingFinder.boxingInvokes.isEmpty, s"Found boxing: ${boxingFinder.boxingInvokes}") - } - - test("Normal accumulator should do boxing") { - // We need this test to make sure BoxingFinder works. - val l = sparkContext.accumulator(0L) - val f = () => { l += 1L } - val cl = BoxingFinder.getClassReader(f.getClass) - val boxingFinder = new BoxingFinder() - cl.accept(boxingFinder, 0) - assert(boxingFinder.boxingInvokes.nonEmpty, "Found find boxing in this test") - } - /** * Call `df.collect()` and verify if the collected metrics are same as "expectedMetrics". * @@ -323,76 +293,3 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { } } - -private case class MethodIdentifier[T](cls: Class[T], name: String, desc: String) - -/** - * If `method` is null, search all methods of this class recursively to find if they do some boxing. - * If `method` is specified, only search this method of the class to speed up the searching. - * - * This method will skip the methods in `visitedMethods` to avoid potential infinite cycles. - */ -private class BoxingFinder( - method: MethodIdentifier[_] = null, - val boxingInvokes: mutable.Set[String] = mutable.Set.empty, - visitedMethods: mutable.Set[MethodIdentifier[_]] = mutable.Set.empty) - extends ClassVisitor(ASM5) { - - private val primitiveBoxingClassName = - Set("java/lang/Long", - "java/lang/Double", - "java/lang/Integer", - "java/lang/Float", - "java/lang/Short", - "java/lang/Character", - "java/lang/Byte", - "java/lang/Boolean") - - override def visitMethod( - access: Int, name: String, desc: String, sig: String, exceptions: Array[String]): - MethodVisitor = { - if (method != null && (method.name != name || method.desc != desc)) { - // If method is specified, skip other methods. - return new MethodVisitor(ASM5) {} - } - - new MethodVisitor(ASM5) { - override def visitMethodInsn( - op: Int, owner: String, name: String, desc: String, itf: Boolean) { - if (op == INVOKESPECIAL && name == "<init>" || op == INVOKESTATIC && name == "valueOf") { - if (primitiveBoxingClassName.contains(owner)) { - // Find boxing methods, e.g, new java.lang.Long(l) or java.lang.Long.valueOf(l) - boxingInvokes.add(s"$owner.$name") - } - } else { - // scalastyle:off classforname - val classOfMethodOwner = Class.forName(owner.replace('/', '.'), false, - Thread.currentThread.getContextClassLoader) - // scalastyle:on classforname - val m = MethodIdentifier(classOfMethodOwner, name, desc) - if (!visitedMethods.contains(m)) { - // Keep track of visited methods to avoid potential infinite cycles - visitedMethods += m - val cl = BoxingFinder.getClassReader(classOfMethodOwner) - visitedMethods += m - cl.accept(new BoxingFinder(m, boxingInvokes, visitedMethods), 0) - } - } - } - } - } -} - -private object BoxingFinder { - - def getClassReader(cls: Class[_]): ClassReader = { - val className = cls.getName.replaceFirst("^.*\\.", "") + ".class" - val resourceStream = cls.getResourceAsStream(className) - val baos = new ByteArrayOutputStream(128) - // Copy data over, before delegating to ClassReader - - // else we can run out of open file handles. - Utils.copyStream(resourceStream, baos, true) - new ClassReader(new ByteArrayInputStream(baos.toByteArray)) - } - -} diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index f4f8bd435d..207dbf56d3 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -111,6 +111,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter } def createContainer(host: String): Container = { + // When YARN 2.6+ is required, avoid deprecation by using version with long second arg val containerId = ContainerId.newInstance(appAttemptId, containerNum) containerNum += 1 val nodeId = NodeId.newInstance(host, 1000) -- GitLab