Skip to content
Snippets Groups Projects
Commit 638927b7 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #683 from shivaram/sbt-test-fix

Remove some stack traces from sbt test output
parents 3c131783 4af0d63c
No related branches found
No related tags found
No related merge requests found
......@@ -45,6 +45,7 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
// Stop the workers before the master so they don't get upset that it disconnected
workerActorSystems.foreach(_.shutdown())
workerActorSystems.foreach(_.awaitTermination())
masterActorSystems.foreach(_.shutdown())
masterActorSystems.foreach(_.awaitTermination())
}
......
......@@ -2,6 +2,9 @@ package spark
import java.io.File
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.scalatest.FunSuite
import org.scalatest.concurrent.Timeouts
import org.scalatest.prop.TableDrivenPropertyChecks._
......@@ -27,6 +30,7 @@ class DriverSuite extends FunSuite with Timeouts {
*/
object DriverWithoutCleanup {
def main(args: Array[String]) {
Logger.getRootLogger().setLevel(Level.WARN)
val sc = new SparkContext(args(0), "DriverWithoutCleanup")
sc.parallelize(1 to 100, 4).count()
}
......
......@@ -85,7 +85,6 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
in.close()
_ * fileVal + _ * fileVal
}.collect
println(result)
assert(result.toSet === Set((1,200), (2,300), (3,500)))
}
......
......@@ -2,12 +2,21 @@ package spark
import org.scalatest.Suite
import org.scalatest.BeforeAndAfterEach
import org.scalatest.BeforeAndAfterAll
import org.jboss.netty.logging.InternalLoggerFactory
import org.jboss.netty.logging.Slf4JLoggerFactory
/** Manages a local `sc` {@link SparkContext} variable, correctly stopping it after each test. */
trait LocalSparkContext extends BeforeAndAfterEach { self: Suite =>
trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self: Suite =>
@transient var sc: SparkContext = _
override def beforeAll() {
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
super.beforeAll()
}
override def afterEach() {
resetSparkContext()
super.afterEach()
......
......@@ -67,7 +67,7 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
test("pipe with non-zero exit status") {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val piped = nums.pipe("cat nonexistent_file")
val piped = nums.pipe(Seq("cat nonexistent_file", "2>", "/dev/null"))
intercept[SparkException] {
piped.collect()
}
......
......@@ -8,7 +8,7 @@ import org.apache.hadoop.conf.Configuration
import java.io._
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import java.util.concurrent.Executors
import java.util.concurrent.RejectedExecutionException
private[streaming]
class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
......@@ -91,7 +91,12 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
oos.writeObject(checkpoint)
oos.close()
bos.close()
executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
try {
executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
} catch {
case rej: RejectedExecutionException =>
logError("Could not submit checkpoint task to the thread pool executor", rej)
}
}
def stop() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment