Skip to content
Snippets Groups Projects
Commit b1b9d580 authored by Oleksii Kostyliev's avatar Oleksii Kostyliev Committed by Andrew Or
Browse files

[SPARK-7233] [CORE] Detect REPL mode once

<h3>Description</h3>
Detect REPL mode once per JVM lifespan.
Previous behavior was to check presence of interpreter mode every time a job was submitted. In the case of execution of multiple short-living jobs this was causing massive mutual blocks between submission threads.

For more details please refer to https://issues.apache.org/jira/browse/SPARK-7233.

<h3>Notes</h3>
* I inverted the return value in case of catching an exception from `true` to `false`. It seems more logical to assume that if the REPL class is not found, we aren't in the interpreter mode.
* I'd personally would call `classForName` with just a Spark classloader (`org.apache.spark.util.Utils#getSparkClassLoader`) but `org.apache.spark.util.Utils#getContextOrSparkClassLoader` is said to be preferable.
* I struggled to come up with a concise, readable and clear unit test. Suggestions are welcome if you feel necessary.

Author: Oleksii Kostyliev <etander@gmail.com>
Author: Oleksii Kostyliev <okostyliev@thunderhead.com>

Closes #5835 from preeze/SPARK-7233 and squashes the following commits:

69bb9e4 [Oleksii Kostyliev] SPARK-7527: fixed explanatory comment to meet style-checker requirements
26dcc24 [Oleksii Kostyliev] SPARK-7527: fixed explanatory comment to meet style-checker requirements
c6f9685 [Oleksii Kostyliev] Merge remote-tracking branch 'remotes/upstream/master' into SPARK-7233
b78a983 [Oleksii Kostyliev] SPARK-7527: revert the fix and let it be addressed separately at a later stage
b64d441 [Oleksii Kostyliev] SPARK-7233: inline inInterpreter parameter into instantiateClass
86e2606 [Oleksii Kostyliev] SPARK-7233, SPARK-7527: Handle interpreter mode properly.
c7ee69c [Oleksii Kostyliev] Merge remote-tracking branch 'upstream/master' into SPARK-7233
d6c07fc [Oleksii Kostyliev] SPARK-7233: properly handle the inverted meaning of isInInterpreter
c319039 [Oleksii Kostyliev] SPARK-7233: move inInterpreter to Utils and make it lazy
parent 8f4aaba0
No related branches found
No related tags found
No related merge requests found
...@@ -239,15 +239,6 @@ private[spark] object ClosureCleaner extends Logging { ...@@ -239,15 +239,6 @@ private[spark] object ClosureCleaner extends Logging {
logDebug(s" + fields accessed by starting closure: " + accessedFields.size) logDebug(s" + fields accessed by starting closure: " + accessedFields.size)
accessedFields.foreach { f => logDebug(" " + f) } accessedFields.foreach { f => logDebug(" " + f) }
val inInterpreter = {
try {
val interpClass = Class.forName("spark.repl.Main")
interpClass.getMethod("interp").invoke(null) != null
} catch {
case _: ClassNotFoundException => true
}
}
// List of outer (class, object) pairs, ordered from outermost to innermost // List of outer (class, object) pairs, ordered from outermost to innermost
// Note that all outer objects but the outermost one (first one in this list) must be closures // Note that all outer objects but the outermost one (first one in this list) must be closures
var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip outerObjects).reverse var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip outerObjects).reverse
...@@ -274,7 +265,7 @@ private[spark] object ClosureCleaner extends Logging { ...@@ -274,7 +265,7 @@ private[spark] object ClosureCleaner extends Logging {
// required fields from the original object. We need the parent here because the Java // required fields from the original object. We need the parent here because the Java
// language specification requires the first constructor parameter of any closure to be // language specification requires the first constructor parameter of any closure to be
// its enclosing object. // its enclosing object.
val clone = instantiateClass(cls, parent, inInterpreter) val clone = instantiateClass(cls, parent)
for (fieldName <- accessedFields(cls)) { for (fieldName <- accessedFields(cls)) {
val field = cls.getDeclaredField(fieldName) val field = cls.getDeclaredField(fieldName)
field.setAccessible(true) field.setAccessible(true)
...@@ -327,9 +318,8 @@ private[spark] object ClosureCleaner extends Logging { ...@@ -327,9 +318,8 @@ private[spark] object ClosureCleaner extends Logging {
private def instantiateClass( private def instantiateClass(
cls: Class[_], cls: Class[_],
enclosingObject: AnyRef, enclosingObject: AnyRef): AnyRef = {
inInterpreter: Boolean): AnyRef = { if (!Utils.isInInterpreter) {
if (!inInterpreter) {
// This is a bona fide closure class, whose constructor has no effects // This is a bona fide closure class, whose constructor has no effects
// other than to set its fields, so use its constructor // other than to set its fields, so use its constructor
val cons = cls.getConstructors()(0) val cons = cls.getConstructors()(0)
......
...@@ -1795,6 +1795,20 @@ private[spark] object Utils extends Logging { ...@@ -1795,6 +1795,20 @@ private[spark] object Utils extends Logging {
} }
} }
lazy val isInInterpreter: Boolean = {
try {
val interpClass = classForName("spark.repl.Main")
interpClass.getMethod("interp").invoke(null) != null
} catch {
// Returning true seems to be a mistake.
// Currently changing it to false causes tests failures in Streaming.
// For a more detailed discussion, please, refer to
// https://github.com/apache/spark/pull/5835#issuecomment-101042271 and subsequent comments.
// Addressing this changed is tracked as https://issues.apache.org/jira/browse/SPARK-7527
case _: ClassNotFoundException => true
}
}
/** /**
* Return a well-formed URI for the file described by a user input string. * Return a well-formed URI for the file described by a user input string.
* *
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment