diff --git a/core/src/main/scala/spark/LocalScheduler.scala b/core/src/main/scala/spark/LocalScheduler.scala
index fe67eb871abf564697d365dcf79456e1881b4633..3910c7b09e915c173c41c8d6b96bc427d2b6aea1 100644
--- a/core/src/main/scala/spark/LocalScheduler.scala
+++ b/core/src/main/scala/spark/LocalScheduler.scala
@@ -46,9 +46,15 @@ private class LocalScheduler(threads: Int, maxFailures: Int) extends DAGSchedule
             idInJob, bytes.size, timeTaken))
         val deserializedTask = ser.deserialize[Task[_]](bytes, currentThread.getContextClassLoader)
         val result: Any = deserializedTask.run(attemptId)
+
+        // Serialize and deserialize the result to emulate what the mesos
+        // executor does. This is useful to catch serialization errors early
+        // on in development (so when users move their local Spark programs
+        // to the cluster, they don't get surprised by serialization errors).
+        val resultToReturn = ser.deserialize[Any](ser.serialize(result))
         val accumUpdates = Accumulators.values
         logInfo("Finished task " + idInJob)
-        taskEnded(task, Success, result, accumUpdates)
+        taskEnded(task, Success, resultToReturn, accumUpdates)
       } catch {
         case t: Throwable => {
           logError("Exception in task " + idInJob, t)
diff --git a/core/src/test/scala/spark/FailureSuite.scala b/core/src/test/scala/spark/FailureSuite.scala
index ab21f6a6f063be7f810ba41293f4514e013054fb..75df4bee0948f9f7d4c4e9141873c025d8d582a3 100644
--- a/core/src/test/scala/spark/FailureSuite.scala
+++ b/core/src/test/scala/spark/FailureSuite.scala
@@ -65,5 +65,21 @@ class FailureSuite extends FunSuite {
     FailureSuiteState.clear()
   }
 
+  test("failure because task results are not serializable") {
+    val sc = new SparkContext("local[1,1]", "test")
+    val results = sc.makeRDD(1 to 3).map(x => new NonSerializable)
+
+    val thrown = intercept[spark.SparkException] {
+      results.collect()
+    }
+    assert(thrown.getClass === classOf[spark.SparkException])
+    assert(thrown.getMessage.contains("NotSerializableException"))
+
+    sc.stop()
+    FailureSuiteState.clear()
+  }
+
   // TODO: Need to add tests with shuffle fetch failures.
 }
+
+