diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index ec59b4f48fddd8bb6887dc4bfdaef52c720ca311..16b00d15aab2e5ae1c3a69ab40d040f2ece3e787 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -5,6 +5,7 @@ import serializer.Serializer import akka.actor.{Actor, ActorRef, Props, ActorSystemImpl, ActorSystem} import akka.remote.RemoteActorRefProvider +import akka.event.{Logging => AkkaLogging} import spark.broadcast.BroadcastManager import spark.storage.BlockManager @@ -51,6 +52,7 @@ class SparkEnv ( broadcastManager.stop() blockManager.stop() blockManager.master.stop() + actorSystem.eventStream.setLogLevel(AkkaLogging.ErrorLevel) actorSystem.shutdown() // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut // down, but let's call it anyway in case it gets fixed in a later release diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala index 55bb61b0ccf88918a691684871c7627d064d99f6..cb85419ae43b560e6fb4f049ec3b9ba31c8068ed 100644 --- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala @@ -1,6 +1,7 @@ package spark.deploy import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated} +import akka.event.{Logging => AkkaLogging} import spark.deploy.worker.Worker import spark.deploy.master.Master @@ -43,8 +44,11 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I def stop() { logInfo("Shutting down local Spark cluster.") // Stop the workers before the master so they don't get upset that it disconnected + workerActorSystems.foreach(_.eventStream.setLogLevel(AkkaLogging.ErrorLevel)) workerActorSystems.foreach(_.shutdown()) workerActorSystems.foreach(_.awaitTermination()) + + masterActorSystems.foreach(_.eventStream.setLogLevel(AkkaLogging.ErrorLevel)) masterActorSystems.foreach(_.shutdown()) masterActorSystems.foreach(_.awaitTermination()) } diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/spark/FileServerSuite.scala index f1a35bced3ad7df19f848709811feeed85feca7f..9c24ca430daf4c3301a3f869ff6f9f22acc7c4a2 100644 --- a/core/src/test/scala/spark/FileServerSuite.scala +++ b/core/src/test/scala/spark/FileServerSuite.scala @@ -85,7 +85,6 @@ class FileServerSuite extends FunSuite with LocalSparkContext { in.close() _ * fileVal + _ * fileVal }.collect - println(result) assert(result.toSet === Set((1,200), (2,300), (3,500))) } diff --git a/core/src/test/scala/spark/LocalSparkContext.scala b/core/src/test/scala/spark/LocalSparkContext.scala index 76d5258b02b9712fae6d5a5935f6036a1a23fb44..bd184222ed30176d8e920dcfe65e36a0bf97dfc7 100644 --- a/core/src/test/scala/spark/LocalSparkContext.scala +++ b/core/src/test/scala/spark/LocalSparkContext.scala @@ -2,12 +2,21 @@ package spark import org.scalatest.Suite import org.scalatest.BeforeAndAfterEach +import org.scalatest.BeforeAndAfterAll + +import org.jboss.netty.logging.InternalLoggerFactory +import org.jboss.netty.logging.Slf4JLoggerFactory /** Manages a local `sc` {@link SparkContext} variable, correctly stopping it after each test. */ -trait LocalSparkContext extends BeforeAndAfterEach { self: Suite => +trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self: Suite => @transient var sc: SparkContext = _ + override def beforeAll() { + InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory()); + super.beforeAll() + } + override def afterEach() { resetSparkContext() super.afterEach()