Skip to content
Snippets Groups Projects
Commit 1d04c86f authored by Wenchen Fan's avatar Wenchen Fan Committed by Andrew Or
Browse files

[SPARK-14558][CORE] In ClosureCleaner, clean the outer pointer if it's a REPL line object

## What changes were proposed in this pull request?

When we clean a closure, if its outermost parent is not a closure, we won't clone and clean it as cloning user's objects is dangerous. However, if it's a REPL line object, which may carry a lot of unnecessary references(like hadoop conf, spark conf, etc.), we should clean it as it's not a user object.

This PR improves the check for user's objects to exclude REPL line object.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #12327 from cloud-fan/closure.
parent a46f98d3
No related branches found
No related tags found
No related merge requests found
......@@ -19,7 +19,8 @@ package org.apache.spark.util
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import scala.collection.mutable.{Map, Set}
import scala.collection.mutable.{Map, Set, Stack}
import scala.language.existentials
import org.apache.xbean.asm5.{ClassReader, ClassVisitor, MethodVisitor, Type}
import org.apache.xbean.asm5.Opcodes._
......@@ -77,35 +78,19 @@ private[spark] object ClosureCleaner extends Logging {
*/
private def getInnerClosureClasses(obj: AnyRef): List[Class[_]] = {
val seen = Set[Class[_]](obj.getClass)
var stack = List[Class[_]](obj.getClass)
val stack = Stack[Class[_]](obj.getClass)
while (!stack.isEmpty) {
val cr = getClassReader(stack.head)
stack = stack.tail
val cr = getClassReader(stack.pop())
val set = Set[Class[_]]()
cr.accept(new InnerClosureFinder(set), 0)
for (cls <- set -- seen) {
seen += cls
stack = cls :: stack
stack.push(cls)
}
}
(seen - obj.getClass).toList
}
private def createNullValue(cls: Class[_]): AnyRef = {
if (cls.isPrimitive) {
cls match {
case java.lang.Boolean.TYPE => new java.lang.Boolean(false)
case java.lang.Character.TYPE => new java.lang.Character('\u0000')
case java.lang.Void.TYPE =>
// This should not happen because `Foo(void x) {}` does not compile.
throw new IllegalStateException("Unexpected void parameter in constructor")
case _ => new java.lang.Byte(0: Byte)
}
} else {
null
}
}
/**
* Clean the given closure in place.
*
......@@ -233,16 +218,24 @@ private[spark] object ClosureCleaner extends Logging {
// Note that all outer objects but the outermost one (first one in this list) must be closures
var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip outerObjects).reverse
var parent: AnyRef = null
if (outerPairs.size > 0 && !isClosure(outerPairs.head._1)) {
// The closure is ultimately nested inside a class; keep the object of that
// class without cloning it since we don't want to clone the user's objects.
// Note that we still need to keep around the outermost object itself because
// we need it to clone its child closure later (see below).
logDebug(s" + outermost object is not a closure, so do not clone it: ${outerPairs.head}")
parent = outerPairs.head._2 // e.g. SparkContext
outerPairs = outerPairs.tail
} else if (outerPairs.size > 0) {
logDebug(s" + outermost object is a closure, so we just keep it: ${outerPairs.head}")
if (outerPairs.size > 0) {
val (outermostClass, outermostObject) = outerPairs.head
if (isClosure(outermostClass)) {
logDebug(s" + outermost object is a closure, so we clone it: ${outerPairs.head}")
} else if (outermostClass.getName.startsWith("$line")) {
// SPARK-14558: if the outermost object is a REPL line object, we should clone and clean it
// as it may carray a lot of unnecessary information, e.g. hadoop conf, spark conf, etc.
logDebug(s" + outermost object is a REPL line object, so we clone it: ${outerPairs.head}")
} else {
// The closure is ultimately nested inside a class; keep the object of that
// class without cloning it since we don't want to clone the user's objects.
// Note that we still need to keep around the outermost object itself because
// we need it to clone its child closure later (see below).
logDebug(" + outermost object is not a closure or REPL line object, so do not clone it: " +
outerPairs.head)
parent = outermostObject // e.g. SparkContext
outerPairs = outerPairs.tail
}
} else {
logDebug(" + there are no enclosing objects!")
}
......
......@@ -373,4 +373,31 @@ class ReplSuite extends SparkFunSuite {
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
}
test("should clone and clean line object in ClosureCleaner") {
val output = runInterpreterInPasteMode("local-cluster[1,4,4096]",
"""
|import org.apache.spark.rdd.RDD
|
|val lines = sc.textFile("pom.xml")
|case class Data(s: String)
|val dataRDD = lines.map(line => Data(line.take(3)))
|dataRDD.cache.count
|val repartitioned = dataRDD.repartition(dataRDD.partitions.size)
|repartitioned.cache.count
|
|def getCacheSize(rdd: RDD[_]) = {
| sc.getRDDStorageInfo.filter(_.id == rdd.id).map(_.memSize).sum
|}
|val cacheSize1 = getCacheSize(dataRDD)
|val cacheSize2 = getCacheSize(repartitioned)
|
|// The cache size of dataRDD and the repartitioned one should be similar.
|val deviation = math.abs(cacheSize2 - cacheSize1).toDouble / cacheSize1
|assert(deviation < 0.2,
| s"deviation too large: $deviation, first size: $cacheSize1, second size: $cacheSize2")
""".stripMargin)
assertDoesNotContain("AssertionError", output)
assertDoesNotContain("Exception", output)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment