diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index d35b2b1cac7a4f634bb4fcf369fb075b41a548cf..2e8cb609b262967fc371997cf14bd0729226cf75 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -63,7 +63,7 @@ class SparkContext(
     System.setProperty("spark.master.port", "0")
   }
 
-  private val isLocal = master.startsWith("local") // TODO: better check for local
+  private val isLocal = (master == "local" || master.startsWith("local["))
 
   // Create the Spark execution environment (cache, map output tracker, etc)
   val env = SparkEnv.createFromSystemProperties(
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index ad02b8525425599474d91ef5d84fb67bf1e5881b..ac30ae9aeca10a80055d6847beaf037667e306c7 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -69,17 +69,19 @@ class Executor extends Logging {
         val value = task.run(taskId.toInt)
         val accumUpdates = Accumulators.values
         val result = new TaskResult(value, accumUpdates)
-        context.statusUpdate(taskId, TaskState.FINISHED, ser.serialize(result))
+        val serializedResult = ser.serialize(result)
+        logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit)
+        context.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
         logInfo("Finished task ID " + taskId)
       } catch {
         case ffe: FetchFailedException => {
           val reason = ffe.toTaskEndReason
-          context.statusUpdate(taskId, TaskState.FINISHED, ser.serialize(reason))
+          context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
         }
 
         case t: Throwable => {
           val reason = ExceptionFailure(t)
-          context.statusUpdate(taskId, TaskState.FINISHED, ser.serialize(reason))
+          context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
 
           // TODO: Should we exit the whole executor here? On the one hand, the failed task may
           // have left some weird state around depending on when the exception was thrown, but on
diff --git a/core/src/main/scala/spark/executor/MesosExecutorRunner.scala b/core/src/main/scala/spark/executor/MesosExecutorRunner.scala
index 7695cbdfd707bb0b6cb0566de961172662830ba2..f97d9d0bfa1506ada3241b5b89eba1d54cfd6d95 100644
--- a/core/src/main/scala/spark/executor/MesosExecutorRunner.scala
+++ b/core/src/main/scala/spark/executor/MesosExecutorRunner.scala
@@ -6,6 +6,7 @@ import org.apache.mesos.Protos.{TaskState => MesosTaskState, TaskStatus => Mesos
 import spark.TaskState.TaskState
 import com.google.protobuf.ByteString
 import spark.{Utils, Logging}
+import spark.TaskState
 
 class MesosExecutorRunner(executor: Executor)
   extends MesosExecutor
@@ -18,7 +19,7 @@ class MesosExecutorRunner(executor: Executor)
     val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build()
     driver.sendStatusUpdate(MesosTaskStatus.newBuilder()
       .setTaskId(mesosTaskId)
-      .setState(MesosTaskState.TASK_FINISHED)
+      .setState(TaskState.toMesos(state))
       .setData(ByteString.copyFrom(data))
       .build())
   }
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 75b67a0eb4d235f963cf0915ba91c3830eec67aa..ab07f1c8c2e264eab12e9ff8875362e66f0bde57 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -227,6 +227,7 @@ class TaskSetManager(
   }
 
   def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
+    logInfo("statusUpdate: " + tid + " is now " + state + " " + serializedData)
     state match {
       case TaskState.FINISHED =>
         taskFinished(tid, state, serializedData)
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala b/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala
index 8131d84fdf8f175b4d9bc7be9eb7351cf807c3f0..f5c35becdaf707e36d93e93ffe9a4b06610a38fa 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala
@@ -188,10 +188,12 @@ class MesosScheduler(
       for ((taskList, index) <- taskLists.zipWithIndex) {
         if (!taskList.isEmpty) {
           val offerNum = offerableIndices(index)
+          val slaveId = offers(offerNum).getSlaveId.getValue
+          slaveIdsWithExecutors += slaveId
           mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
           for (taskDesc <- taskList) {
-            taskIdToSlaveId(taskDesc.taskId) = offers(offerNum).getSlaveId.getValue
-            mesosTasks(offerNum).add(createMesosTask(taskDesc, offers(offerNum).getSlaveId))
+            taskIdToSlaveId(taskDesc.taskId) = slaveId
+            mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
           }
         }
       }
@@ -214,7 +216,7 @@ class MesosScheduler(
   }
 
   /** Turn a Spark TaskDescription into a Mesos task */
-  def createMesosTask(task: TaskDescription, slaveId: SlaveID): MesosTaskInfo = {
+  def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
     val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
     val cpuResource = Resource.newBuilder()
       .setName("cpus")
@@ -223,7 +225,7 @@ class MesosScheduler(
       .build()
     return MesosTaskInfo.newBuilder()
       .setTaskId(taskId)
-      .setSlaveId(slaveId)
+      .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
       .setExecutor(executorInfo)
       .setName(task.name)
       .addResources(cpuResource)