diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 28d643abca8f42686f7174cff3aad0d6dd338285..81daacf958b5a03d3135f4587f2d6e411b0c6a3c 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -454,4 +454,25 @@ private object Utils extends Logging { def clone[T](value: T, serializer: SerializerInstance): T = { serializer.deserialize[T](serializer.serialize(value)) } + + /** + * Detect whether this thread might be executing a shutdown hook. Will always return true if + * the current thread is a running a shutdown hook but may spuriously return true otherwise (e.g. + * if System.exit was just called by a concurrent thread). + * + * Currently, this detects whether the JVM is shutting down by Runtime#addShutdownHook throwing + * an IllegalStateException. + */ + def inShutdown(): Boolean = { + try { + val hook = new Thread { + override def run() {} + } + Runtime.getRuntime.addShutdownHook(hook) + Runtime.getRuntime.removeShutdownHook(hook) + } catch { + case ise: IllegalStateException => return true + } + return false + } } diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index bd21ba719a77cf2eaca6b2f70002f191daca4653..5de09030aa1b3b4318c1eb6051eed3e1a3fb23ef 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -50,14 +50,19 @@ private[spark] class Executor extends Logging { override def uncaughtException(thread: Thread, exception: Throwable) { try { logError("Uncaught exception in thread " + thread, exception) - if (exception.isInstanceOf[OutOfMemoryError]) { - System.exit(ExecutorExitCode.OOM) - } else { - System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) + + // We may have been called from a shutdown hook. If so, we must not call System.exit(). + // (If we do, we will deadlock.) + if (!Utils.inShutdown()) { + if (exception.isInstanceOf[OutOfMemoryError]) { + System.exit(ExecutorExitCode.OOM) + } else { + System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) + } } } catch { - case oom: OutOfMemoryError => System.exit(ExecutorExitCode.OOM) - case t: Throwable => System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) + case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) + case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) } } } diff --git a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala index f2f4fd56d1349e4f2cbbda42c077b187b44f6667..41ff62dd2285749b0829b29a8c9293aee4dbbcfd 100644 --- a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala +++ b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala @@ -40,3 +40,15 @@ class PartitionPruningRDD[T: ClassManifest]( override protected def getPartitions: Array[Partition] = getDependencies.head.asInstanceOf[PruneDependency[T]].partitions } + + +object PartitionPruningRDD { + + /** + * Create a PartitionPruningRDD. This function can be used to create the PartitionPruningRDD + * when its type T is not known at compile time. + */ + def create[T](rdd: RDD[T], partitionFilterFunc: Int => Boolean) = { + new PartitionPruningRDD[T](rdd, partitionFilterFunc)(rdd.elementClassManifest) + } +} diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index 7e5b820cbbdc6ca145c2eb7c6787bd2c137c80d0..ddbf8821ad15aa172cb248ef3aed45edb3e38f33 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -178,7 +178,11 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") { override def run() { logDebug("Shutdown hook called") - localDirs.foreach(localDir => Utils.deleteRecursively(localDir)) + try { + localDirs.foreach(localDir => Utils.deleteRecursively(localDir)) + } catch { + case t: Throwable => logError("Exception while deleting local spark dirs", t) + } } }) } diff --git a/docs/_config.yml b/docs/_config.yml index 2bd2eecc863e40eba17372829dcb6e0ce5d108e9..09617e4a1efb6b3486970b7fc7715f1fb0993a16 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -7,3 +7,4 @@ SPARK_VERSION: 0.7.0-SNAPSHOT SPARK_VERSION_SHORT: 0.7.0 SCALA_VERSION: 2.9.2 MESOS_VERSION: 0.9.0-incubating +SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net diff --git a/docs/contributing-to-spark.md b/docs/contributing-to-spark.md index 14d0dc856b2f7e31ff90d272c4a335d955f81404..50feeb2d6c42afa14e7092a7902d75e5362d570f 100644 --- a/docs/contributing-to-spark.md +++ b/docs/contributing-to-spark.md @@ -15,7 +15,7 @@ The Spark team welcomes contributions in the form of GitHub pull requests. Here But first, make sure that you have [configured a spark-env.sh](configuration.html) with at least `SCALA_HOME`, as some of the tests try to spawn subprocesses using this. - Add new unit tests for your code. We use [ScalaTest](http://www.scalatest.org/) for testing. Just add a new Suite in `core/src/test`, or methods to an existing Suite. -- If you'd like to report a bug but don't have time to fix it, you can still post it to our [issue tracker](https://spark-project.atlassian.net), or email the [mailing list](http://www.spark-project.org/mailing-lists.html). +- If you'd like to report a bug but don't have time to fix it, you can still post it to our [issue tracker]({{site.SPARK_ISSUE_TRACKER_URL}}), or email the [mailing list](http://www.spark-project.org/mailing-lists.html). # Licensing of Contributions