diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala index 0942fecff3ac2976cbbb0aa4137510b71be0ca0f..d4d80845c52015328ad9b4a2140dba9e145faec9 100644 --- a/core/src/main/scala/spark/Executor.scala +++ b/core/src/main/scala/spark/Executor.scala @@ -4,6 +4,7 @@ import java.io.{File, FileOutputStream} import java.net.{URI, URL, URLClassLoader} import java.util.concurrent._ +import scala.actors.remote.RemoteActor import scala.collection.mutable.ArrayBuffer import mesos.{ExecutorArgs, ExecutorDriver, MesosExecutorDriver} @@ -25,6 +26,9 @@ class Executor extends mesos.Executor with Logging { for ((key, value) <- props) System.setProperty(key, value) + // Make sure an appropriate class loader is set for remote actors + RemoteActor.classLoader = getClass.getClassLoader + // Initialize Spark environment (using system properties read above) env = SparkEnv.createFromSystemProperties(false) SparkEnv.set(env) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 1f2bddc60e8df2cd1b67d903ff40baa0a68f4d03..74e6ff1fec14404a652d17ce31c5036afd7d7e5d 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -3,6 +3,7 @@ package spark import java.io._ import java.util.concurrent.atomic.AtomicInteger +import scala.actors.remote.RemoteActor import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.mapred.InputFormat @@ -37,6 +38,10 @@ extends Logging { System.setProperty("spark.master.host", Utils.localHostName) if (System.getProperty("spark.master.port") == null) System.setProperty("spark.master.port", "50501") + + // Make sure a proper class loader is set for remote actors (unless user set one) + if (RemoteActor.classLoader == null) + RemoteActor.classLoader = getClass.getClassLoader // Create the Spark execution environment (cache, map output tracker, etc) val env = SparkEnv.createFromSystemProperties(true)